Member 12885549 Ответов: 0

Добавление нескольких очередей в размещенную службу


Я реализую очереди с помощью https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-3.1&tabs=visual-пример студии.

вот как выглядит мой код:

в startup.cs я добавляю свой размещенный сервис и фоновую очередь
services.AddHostedService<QueuedHostedService>();
services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();


затем я реализую уровня сервиса, размещенного обслуживание и очереди фона, как следующие:

Что я уже пробовал:

namespace Services.Services {
  public class QueuedHostedService: BackgroundService {
    private readonly ILogger _logger;
    private readonly IServiceProvider _serviceProvider;
    public QueuedHostedService(IServiceProvider serviceProvider, IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) {
      _serviceProvider = serviceProvider;
      TaskQueue = taskQueue;
      _logger = loggerFactory.CreateLogger < QueuedHostedService > ();
    }
    public IBackgroundTaskQueue TaskQueue {
      get;
    }

    protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
      while (!cancellationToken.IsCancellationRequested) {
        var workItem = await TaskQueue.DequeueAsync(cancellationToken);

        try {
          await workItem(cancellationToken);
        } catch (Exception ex) {

        }
      }
    }
  }
}


public interface IBackgroundTaskQueue {
  void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem);
  Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken);
}

namespace Services.Services {
  public class BackgroundTaskQueue: IBackgroundTaskQueue {
    private ConcurrentQueue < Func < CancellationToken, Task >> _workItems = new ConcurrentQueue < Func < CancellationToken, Task >> ();
    private SemaphoreSlim _signal = new SemaphoreSlim(0);
    public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) {
      if (workItem == null) {
        throw new ArgumentNullException(nameof(workItem));
      }
      _workItems.Enqueue(workItem);
      _signal.Release();
    }

    public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) {
      await _signal.WaitAsync(cancellationToken);
      _workItems.TryDequeue(out
        var workItem);
      return workItem;
    }
  }
}


// scoped service
namespace Services.Services {
  public class ImportService: BaseService, IImportService {
    private readonly IFileProcessingService _scopedProcessingService;

    private readonly ConfigurationSettings _configurationSettings;
    public IBackgroundTaskQueue Queue {
      get;
    }
    private
    const string AZURE_BLOB_CONTAINER = "blobcontainer";

    public IServiceProvider Services {
      get;
    }

    public ImportService(IServiceProvider services, IBackgroundTaskQueue queue): base(services) {
      Services = services;
      _configurationSettings = services.GetService < ConfigurationSettings > ();
      _scopedProcessingService = services.GetProcessingService();
      Queue = queue;
    }

    // ---- Main file
    public async Task ImportFile(string filePath, long fileSize, int userId, FileFormatType fileFormat, TransactionsDataHeadersMap dataHeadersMap, string delimiter, string dateFormat) {
      await _scopedProcessingService.ImportFile(filePath, fileSize, userId, fileFormat, dataHeadersMap, delimiter, dateFormat);
    }

    public async Task UploadToBlobStorage(IFormFile file, int userId, TransactionalDataFileType type) {
      var fileFormat = GetFileFormat(file);
      var tempFilePath = await GetTemporaryPath(file);
      var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat;
      // ....... //

      ProcessFile(tempFilePath, fileFormat, file, type, userId);
    }

    private void ProcessFile(string tempFilePath, FileFormatType fileFormat, IFormFile file, Tyoe type, int userId) {
      var delimiter = ",";
      Queue.QueueBackgroundWorkItem(async token => {
        using(var scope = Services.CreateScope()) {
          var scopedProcessingService =
            scope.ServiceProvider
            .GetRequiredService < IFileProcessingService > ();

          // do the processing
          switch (type) {
            case "csv":
              await scopedProcessingService.ImportFile(tempFilePath, file.Length, userId, fileFormat, new Headers(), delimiter ? ? ",", "yyyy-MM-dd");
              break;
          }
        }
      });
    }
  }
}


Я добавляю элементы в очередь по запросу в контроллере. Теперь я хочу добавить еще одну очередь для обработки других запросов. Можно ли использовать другую очередь, используя тот же размещенный сервис? Мне трудно найти примеры, как это сделать. Должен ли я просто добавить еще один scoped servide и еще одну фоновую очередь?

0 Ответов