Элемент очереди процессов net core API
Я использую https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-2.1&tabs=visual-studio как ссылка для создания фоновых очередей служб и процессов.
У меня есть класс ImportService.cs, куда приходит csvfile из HTTP-запроса, затем я хочу добавить его в очередь, которая обрабатывает этот CSV-файл и записывает результаты в базу данных. Это мой класс обслуживания, где у меня есть
IBackgroundTaskQueueпример
using System; using System.Collections.Generic; using System.Diagnostics; using System.Globalization; using System.IO; using System.IO.Compression; using System.Linq; using System.Threading; using System.Threading.Tasks; using CsvHelper; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.DependencyInjection; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Blob; namespace Services.Services { public class ImportService : BaseService, IImportService { private readonly IUploadDataRepository _uploadDataRepository; private readonly ConfigurationSettings _configurationSettings; public IBackgroundTaskQueue Queue { get; } private const string AZURE_BLOB_CONTAINER = "blobcontainer"; public ImportService(IServiceProvider services, IBackgroundTaskQueue queue) : base(services) { _uploadDataRepository = services.GetUploadDataRepository(); _configurationSettings = services.GetService<ConfigurationSettings>(); Queue = queue; } public async Task UploadToBlobStorage(IFormFile file, int userId, Type type) { var fileFormat = GetFileFormat(file); var tempFilePath = await GetTemporaryPath(file); var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat; string storageConnectionString = _configurationSettings.ConnectionStrings.BlobStorageConnection; CloudStorageAccount account = CloudStorageAccount.Parse(storageConnectionString); var blobClient = account.CreateCloudBlobClient(); // Make sure container is there var blobContainer = blobClient.GetContainerReference(AZURE_BLOB_CONTAINER); await blobContainer.CreateIfNotExistsAsync(); // set the permission to blob type await blobContainer.SetPermissionsAsync(new BlobContainerPermissions { PublicAccess = BlobContainerPublicAccessType.Blob }); CloudBlockBlob blockBlob = blobContainer.GetBlockBlobReference(fileName); using (var fileStream = File.OpenRead(tempFilePath)) { await blockBlob.UploadFromStreamAsync(fileStream); } // ADD FILE TO QUEUE AND PROCESS IT Queue.QueueBackgroundWorkItem(async token => { Console.WriteLine("ITEM QUEUED PROCESS IT??"); }); await _uploadDataRepository.Add(uploadData); } }
Ниже я добавлю классы, созданные из примера microsoft:
using System; using System.Threading; using System.Threading.Tasks; namespace Services.Services.Contracts { public interface IBackgroundTaskQueue { void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem); Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken); } } using System; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; namespace Services.Services { /// <summary> /// Queued Hosted Service class /// </summary> public abstract class QueuedHostedService: BackgroundService { private readonly ILogger _logger; private IBackgroundTaskQueue TaskQueue { get; } protected QueuedHostedService(IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) { TaskQueue = taskQueue; _logger = loggerFactory.CreateLogger < QueuedHostedService > (); Console.WriteLine("QueuedHostedService initialized"); } protected override async Task ExecuteAsync(CancellationToken cancellationToken) { _logger.LogInformation("Queued Hosted Service is starting."); while (!cancellationToken.IsCancellationRequested) { var workItem = await TaskQueue.DequeueAsync(cancellationToken); try { await workItem(cancellationToken); } catch (Exception ex) { _logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem)); } } } private void DoWork(object state) { Console.WriteLine("PROCCESS FILEE???"); } } } using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace Services.Services { public class BackgroundTaskQueue: IBackgroundTaskQueue { private ConcurrentQueue < Func < CancellationToken, Task >> _workItems = new ConcurrentQueue < Func < CancellationToken, Task >> (); private SemaphoreSlim _signal = new SemaphoreSlim(0); public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) { if (workItem == null) { throw new ArgumentNullException(nameof(workItem)); } _workItems.Enqueue(workItem); _signal.Release(); } public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) { await _signal.WaitAsync(cancellationToken); _workItems.TryDequeue(out var workItem); return workItem; } } }
Что я уже пробовал:
Мой вопрос заключается в том, где должен быть обработан этот файл? В Импортсервисе? Или в QueuedHostedService? Если в
QueuedHostedServiceкак мне передать этот файл и получить к нему доступ? Что было бы лучшей практикой для этого? Я хотел творить
DoWork()<\pre> function in <pre>QueuedHostedServiceкоторый обрабатывает этот файл, но я не знаю, как это сделать. Или обработка должна выполняться в классе обслуживания импорта?