Files
server/src/Core/HostedServices/ApplicationCacheHostedService.cs

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

125 lines
5.0 KiB
C#
Raw Normal View History

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using Bit.Core.Enums;
using Bit.Core.Repositories;
using Bit.Core.Services;
using Bit.Core.Settings;
using Bit.Core.Utilities;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Bit.Core.HostedServices;
2022-08-29 16:06:55 -04:00
public class ApplicationCacheHostedService : IHostedService, IDisposable
{
private readonly InMemoryServiceBusApplicationCacheService _applicationCacheService;
private readonly IOrganizationRepository _organizationRepository;
protected readonly ILogger<ApplicationCacheHostedService> _logger;
private readonly ServiceBusClient _serviceBusClient;
private readonly ServiceBusReceiver _subscriptionReceiver;
private readonly ServiceBusAdministrationClient _serviceBusAdministrationClient;
private readonly string _subName;
private readonly string _topicName;
private CancellationTokenSource _cts;
private Task _executingTask;
public ApplicationCacheHostedService(
IApplicationCacheService applicationCacheService,
IOrganizationRepository organizationRepository,
ILogger<ApplicationCacheHostedService> logger,
GlobalSettings globalSettings)
{
_topicName = globalSettings.ServiceBus.ApplicationCacheTopicName;
[PM-1969] Spellcheck other (#2878) * Fix typo in error message: 'Unkown' -> 'Unknown' * Fix typos in error message * Fix typo in example text: 'licence' -> 'license' * Fix typo in validation: 'Ooganization' -> 'Organization' * Fix typo in text string: 'compatibilty' -> 'compatibility' * Fix typo: 'ProviderDisllowedOrganizationTypes' -> 'ProviderDisallowedOrganizationTypes' * Fix typo: 'NSubstitueVersion' -> 'NSubstituteVersion' * Fix typo: 'CreateIntialInvite' -> 'CreateInitialInvite' * Fix typo: '_queuryScheme' -> '_queryScheme' * Fix typo: 'GetApplicationCacheServiceBusSubcriptionName' -> 'GetApplicationCacheServiceBusSubscriptionName' * Fix typo: 'metaDataRespository' -> 'metaDataRepository' * Fix typo: 'cipherAttachements' -> 'cipherAttachments' * Fix typo: 'savedEmergencyAccesss' -> 'savedEmergencyAccesses' * Fix typo: 'owerOrgUser' -> 'ownerOrgUser' * Fix typo: 'Organiation' -> 'Organization' * Fix typo: 'extistingUser' -> 'existingUser' * Fix typo: 'availibleAccess' -> 'availableAccess' * Fix typo: 'HasEnouphStorage' -> 'HasEnoughStorage' * Fix typo: 'extistingOrg' -> 'existingOrg' * Fix typo: 'subcriber' -> 'subscriber' * Fix typo: 'availibleCollections' -> 'availableCollections' * Fix typo: 'Succes' -> 'Success' * Fix typo: 'CreateAsync_UpdateWithCollecitons_Works' -> 'CreateAsync_UpdateWithCollections_Works' * Fix typo: 'BadInsallationId' -> 'BadInstallationId' * Fix typo: 'OrgNotFamiles' -> 'OrgNotFamilies' * Revert "Fix typo: 'Organiation' -> 'Organization'" This reverts commit 8aadad1c25d853f26ec39029d157ef63e073d3d4. * Revert "Fix typos in error message" This reverts commit 81d201fc09ae4274b7fabe8c6fbcdbb91647bac8. --------- Co-authored-by: Daniel James Smith <djsmith@web.de>
2023-05-17 06:14:36 -04:00
_subName = CoreHelpers.GetApplicationCacheServiceBusSubscriptionName(globalSettings);
_applicationCacheService = applicationCacheService as InMemoryServiceBusApplicationCacheService;
_organizationRepository = organizationRepository;
_logger = logger;
_serviceBusClient = new ServiceBusClient(globalSettings.ServiceBus.ConnectionString);
_subscriptionReceiver = _serviceBusClient.CreateReceiver(_topicName, _subName);
_serviceBusAdministrationClient = new ServiceBusAdministrationClient(globalSettings.ServiceBus.ConnectionString);
}
public virtual async Task StartAsync(CancellationToken cancellationToken)
2022-08-29 16:06:55 -04:00
{
try
{
await _serviceBusAdministrationClient.CreateSubscriptionAsync(new CreateSubscriptionOptions(_topicName, _subName)
2022-08-29 14:53:16 -04:00
{
DefaultMessageTimeToLive = TimeSpan.FromDays(14),
LockDuration = TimeSpan.FromSeconds(30),
EnableDeadLetteringOnFilterEvaluationExceptions = true,
DeadLetteringOnMessageExpiration = true,
}, new CreateRuleOptions
2022-08-29 16:06:55 -04:00
{
Filter = new SqlRuleFilter($"sys.label != '{_subName}'")
}, cancellationToken);
}
catch (ServiceBusException e)
when (e.Reason == ServiceBusFailureReason.MessagingEntityAlreadyExists)
{ }
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_executingTask = ExecuteAsync(_cts.Token);
2022-08-29 16:06:55 -04:00
}
public virtual async Task StopAsync(CancellationToken cancellationToken)
2022-08-29 16:06:55 -04:00
{
await _subscriptionReceiver.CloseAsync(cancellationToken);
await _serviceBusClient.DisposeAsync();
_cts.Cancel();
2022-08-29 16:06:55 -04:00
try
{
await _serviceBusAdministrationClient.DeleteSubscriptionAsync(_topicName, _subName, cancellationToken);
}
catch { }
await _executingTask;
2022-08-29 16:06:55 -04:00
}
public virtual void Dispose()
{ }
private async Task ExecuteAsync(CancellationToken cancellationToken)
{
await foreach (var message in _subscriptionReceiver.ReceiveMessagesAsync(cancellationToken))
{
try
{
await ProcessMessageAsync(message, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e, "Error processing messages in ApplicationCacheHostedService");
}
}
}
private async Task ProcessMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)
2022-08-29 16:06:55 -04:00
{
if (message.Subject != _subName && _applicationCacheService != null)
{
switch ((ApplicationCacheMessageType)message.ApplicationProperties["type"])
{
case ApplicationCacheMessageType.UpsertOrganizationAbility:
var upsertedOrgId = (Guid)message.ApplicationProperties["id"];
var upsertedOrg = await _organizationRepository.GetByIdAsync(upsertedOrgId);
if (upsertedOrg != null)
2022-08-29 16:06:55 -04:00
{
await _applicationCacheService.BaseUpsertOrganizationAbilityAsync(upsertedOrg);
2022-08-29 16:06:55 -04:00
}
break;
case ApplicationCacheMessageType.DeleteOrganizationAbility:
await _applicationCacheService.BaseDeleteOrganizationAbilityAsync(
(Guid)message.ApplicationProperties["id"]);
2022-08-29 16:06:55 -04:00
break;
default:
break;
}
}
if (!cancellationToken.IsCancellationRequested)
{
await _subscriptionReceiver.CompleteMessageAsync(message, cancellationToken);
}
}
}