Добавление нескольких очередей в размещенную службу
Я реализую очереди с помощью https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-3.1&tabs=visual-пример студии.
вот как выглядит мой код:
в startup.cs я добавляю свой размещенный сервис и фоновую очередь
services.AddHostedService<QueuedHostedService>(); services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
затем я реализую уровня сервиса, размещенного обслуживание и очереди фона, как следующие:
Что я уже пробовал:
namespace Services.Services { public class QueuedHostedService: BackgroundService { private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; public QueuedHostedService(IServiceProvider serviceProvider, IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) { _serviceProvider = serviceProvider; TaskQueue = taskQueue; _logger = loggerFactory.CreateLogger < QueuedHostedService > (); } public IBackgroundTaskQueue TaskQueue { get; } protected override async Task ExecuteAsync(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { var workItem = await TaskQueue.DequeueAsync(cancellationToken); try { await workItem(cancellationToken); } catch (Exception ex) { } } } } } public interface IBackgroundTaskQueue { void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem); Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken); } namespace Services.Services { public class BackgroundTaskQueue: IBackgroundTaskQueue { private ConcurrentQueue < Func < CancellationToken, Task >> _workItems = new ConcurrentQueue < Func < CancellationToken, Task >> (); private SemaphoreSlim _signal = new SemaphoreSlim(0); public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) { if (workItem == null) { throw new ArgumentNullException(nameof(workItem)); } _workItems.Enqueue(workItem); _signal.Release(); } public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) { await _signal.WaitAsync(cancellationToken); _workItems.TryDequeue(out var workItem); return workItem; } } } // scoped service namespace Services.Services { public class ImportService: BaseService, IImportService { private readonly IFileProcessingService _scopedProcessingService; private readonly ConfigurationSettings _configurationSettings; public IBackgroundTaskQueue Queue { get; } private const string AZURE_BLOB_CONTAINER = "blobcontainer"; public IServiceProvider Services { get; } public ImportService(IServiceProvider services, IBackgroundTaskQueue queue): base(services) { Services = services; _configurationSettings = services.GetService < ConfigurationSettings > (); _scopedProcessingService = services.GetProcessingService(); Queue = queue; } // ---- Main file public async Task ImportFile(string filePath, long fileSize, int userId, FileFormatType fileFormat, TransactionsDataHeadersMap dataHeadersMap, string delimiter, string dateFormat) { await _scopedProcessingService.ImportFile(filePath, fileSize, userId, fileFormat, dataHeadersMap, delimiter, dateFormat); } public async Task UploadToBlobStorage(IFormFile file, int userId, TransactionalDataFileType type) { var fileFormat = GetFileFormat(file); var tempFilePath = await GetTemporaryPath(file); var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat; // ....... // ProcessFile(tempFilePath, fileFormat, file, type, userId); } private void ProcessFile(string tempFilePath, FileFormatType fileFormat, IFormFile file, Tyoe type, int userId) { var delimiter = ","; Queue.QueueBackgroundWorkItem(async token => { using(var scope = Services.CreateScope()) { var scopedProcessingService = scope.ServiceProvider .GetRequiredService < IFileProcessingService > (); // do the processing switch (type) { case "csv": await scopedProcessingService.ImportFile(tempFilePath, file.Length, userId, fileFormat, new Headers(), delimiter ? ? ",", "yyyy-MM-dd"); break; } } }); } } }
Я добавляю элементы в очередь по запросу в контроллере. Теперь я хочу добавить еще одну очередь для обработки других запросов. Можно ли использовать другую очередь, используя тот же размещенный сервис? Мне трудно найти примеры, как это сделать. Должен ли я просто добавить еще один scoped servide и еще одну фоновую очередь?