Member 12885549 Ответов: 1

Элемент очереди процессов net core API


Я использую https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-2.1&tabs=visual-studio как ссылка для создания фоновых очередей служб и процессов.

У меня есть класс ImportService.cs, куда приходит csvfile из HTTP-запроса, затем я хочу добавить его в очередь, которая обрабатывает этот CSV-файл и записывает результаты в базу данных. Это мой класс обслуживания, где у меня есть
IBackgroundTaskQueue
пример

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using CsvHelper;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;

namespace Services.Services
{
public class ImportService : BaseService, IImportService
{
    private readonly IUploadDataRepository _uploadDataRepository;
    private readonly ConfigurationSettings _configurationSettings;
    public IBackgroundTaskQueue Queue { get; }
    private const string AZURE_BLOB_CONTAINER = "blobcontainer";

    public ImportService(IServiceProvider services, IBackgroundTaskQueue queue) : base(services)
    {
        _uploadDataRepository = services.GetUploadDataRepository();
        _configurationSettings = services.GetService<ConfigurationSettings>();
        Queue = queue;
    }

    public async Task UploadToBlobStorage(IFormFile file, int userId, Type type)
    {        
        var fileFormat = GetFileFormat(file);
        var tempFilePath = await GetTemporaryPath(file);
        var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat;
        string storageConnectionString = _configurationSettings.ConnectionStrings.BlobStorageConnection;
        CloudStorageAccount account = CloudStorageAccount.Parse(storageConnectionString);
        var blobClient = account.CreateCloudBlobClient();

        // Make sure container is there
        var blobContainer = blobClient.GetContainerReference(AZURE_BLOB_CONTAINER);
        await blobContainer.CreateIfNotExistsAsync();

        // set the permission to blob type
        await blobContainer.SetPermissionsAsync(new BlobContainerPermissions { PublicAccess = BlobContainerPublicAccessType.Blob });
        CloudBlockBlob blockBlob = blobContainer.GetBlockBlobReference(fileName);
        using (var fileStream = File.OpenRead(tempFilePath))
        {
          await blockBlob.UploadFromStreamAsync(fileStream);
        }
            // ADD FILE TO QUEUE AND PROCESS IT
        Queue.QueueBackgroundWorkItem(async token =>
        {
            Console.WriteLine("ITEM QUEUED PROCESS IT??");

        });
        await _uploadDataRepository.Add(uploadData);
    }
}


Ниже я добавлю классы, созданные из примера microsoft:
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Services.Services.Contracts {
  public interface IBackgroundTaskQueue {
    void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem);
    Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken);
  }
}

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace Services.Services {
  /// <summary>
  /// Queued Hosted Service class
  /// </summary>
  public abstract class QueuedHostedService: BackgroundService {
    private readonly ILogger _logger;
    private IBackgroundTaskQueue TaskQueue {
      get;
    }

    protected QueuedHostedService(IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) {
      TaskQueue = taskQueue;
      _logger = loggerFactory.CreateLogger < QueuedHostedService > ();
      Console.WriteLine("QueuedHostedService initialized");
    }
    protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
      _logger.LogInformation("Queued Hosted Service is starting.");

      while (!cancellationToken.IsCancellationRequested) {
        var workItem = await TaskQueue.DequeueAsync(cancellationToken);

        try {
          await workItem(cancellationToken);
        } catch (Exception ex) {
          _logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
        }
      }
    }

    private void DoWork(object state) {
      Console.WriteLine("PROCCESS FILEE???");
    }
  }
}

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Services.Services {
  public class BackgroundTaskQueue: IBackgroundTaskQueue {
    private ConcurrentQueue < Func < CancellationToken, Task >> _workItems =
      new ConcurrentQueue < Func < CancellationToken, Task >> ();

    private SemaphoreSlim _signal = new SemaphoreSlim(0);

    public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) {
      if (workItem == null) {
        throw new ArgumentNullException(nameof(workItem));
      }

      _workItems.Enqueue(workItem);
      _signal.Release();
    }

    public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) {
      await _signal.WaitAsync(cancellationToken);
      _workItems.TryDequeue(out
        var workItem);

      return workItem;
    }
  }
}


Что я уже пробовал:

Мой вопрос заключается в том, где должен быть обработан этот файл? В Импортсервисе? Или в QueuedHostedService? Если в
QueuedHostedService
как мне передать этот файл и получить к нему доступ? Что было бы лучшей практикой для этого? Я хотел творить
DoWork()<\pre> function in <pre>QueuedHostedService
который обрабатывает этот файл, но я не знаю, как это сделать. Или обработка должна выполняться в классе обслуживания импорта?

1 Ответов

Рейтинг:
12

JudyL_MD

Не могу сказать, как я использовал эту конкретную схему для очередей рабочих элементов, но логика одна и та же независимо от схемы. Вы выполняете обработку в рабочем элементе. Весь опубликованный код имеет дело со службой и ее очередями, но не с самим рабочим элементом.

Queue.QueueBackgroundWorkItem(async token =>
        {
            Console.WriteLine("ITEM QUEUED PROCESS IT??");
    });
это ключевая область. Вам нужно творить рабочий элемент, который будет помещен в очередь, и как часть этого создания сохраните большой двоичный объект, содержащий файл, полученный в рабочем элементе. Затем, когда рабочий элемент удаляется из очереди и выполняется, он использует этот сохраненный blob-объект и обрабатывает файл.


Member 12885549

Так что вся обработка должна идти внутрь здесь, верно?

 Queue.QueueBackgroundWorkItem(async token =>
        {
            Console.WriteLine("Processing goes here?");
    });

JudyL_MD

Да, именно там справочная страница, на которую вы ссылаетесь, выполняет свою работу. Замените эту строку записи действиями, которые вы хотите выполнить.