Files
server/src/Core/Services/Implementations/AzureQueueService.cs

73 lines
2.4 KiB
C#
Raw Normal View History

Support large organization sync (#1311) * Increase organization max seat size from 30k to 2b (#1274) * Increase organization max seat size from 30k to 2b * PR review. Do not modify unless state matches expected * Organization sync simultaneous event reporting (#1275) * Split up azure messages according to max size * Allow simultaneous login of organization user events * Early resolve small event lists * Clarify logic Co-authored-by: Chad Scharf <3904944+cscharf@users.noreply.github.com> * Improve readability This comes at the cost of multiple serializations, but the improvement in wire-time should more than make up for this on message where serialization time matters Co-authored-by: Chad Scharf <3904944+cscharf@users.noreply.github.com> * Queue emails (#1286) * Extract common Azure queue methods * Do not use internal entity framework namespace * Prefer IEnumerable to IList unless needed All of these implementations were just using `Count == 1`, which is easily replicated. This will be used when abstracting Azure queues * Add model for azure queue message * Abstract Azure queue for reuse * Creat service to enqueue mail messages for later processing Azure queue mail service uses Azure queues. Blocking just blocks until all the work is done -- This is how emailing works today * Provide mail queue service to DI * Queue organization invite emails for later processing All emails can later be added to this queue * Create Admin hosted service to process enqueued mail messages * Prefer constructors to static generators * Mass delete organization users (#1287) * Add delete many to Organization Users * Correct formatting * Remove erroneous migration * Clarify parameter name * Formatting fixes * Simplify bump account revision sproc * Formatting fixes * Match file names to objects * Indicate if large import is expected * Early pull all existing users we were planning on inviting (#1290) * Early pull all existing users we were planning on inviting * Improve sproc name * Batch upsert org users (#1289) * Add UpsertMany sprocs to OrganizationUser * Add method to create TVPs from any object. Uses DbOrder attribute to generate. Sproc will fail unless TVP column order matches that of the db type * Combine migrations * Correct formatting * Include sql objects in sql project * Keep consisten parameter names * Batch deletes for performance * Correct formatting * consolidate migrations * Use batch methods in OrganizationImport * Declare @BatchSize * Transaction names limited to 32 chars Drop sproc before creating it if it exists * Update import tests * Allow for more users in org upgrades * Fix formatting * Improve class hierarchy structure * Use name tuple types * Fix formatting * Front load all reflection * Format constructor * Simplify ToTvp as class-specific extension Co-authored-by: Chad Scharf <3904944+cscharf@users.noreply.github.com>
2021-05-17 09:43:02 -05:00
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations.Schema;
using System.Linq;
using System.Threading.Tasks;
using Azure.Storage.Queues;
using IdentityServer4.Extensions;
using Microsoft.EntityFrameworkCore.Internal;
using Newtonsoft.Json;
namespace Bit.Core.Services
{
public abstract class AzureQueueService<T>
{
protected QueueClient _queueClient;
protected JsonSerializerSettings _jsonSettings;
protected AzureQueueService(QueueClient queueClient, JsonSerializerSettings jsonSettings)
{
_queueClient = queueClient;
_jsonSettings = jsonSettings;
}
public async Task CreateAsync(T message)
{
var json = JsonConvert.SerializeObject(message, _jsonSettings);
await _queueClient.SendMessageAsync(json);
}
public async Task CreateManyAsync(IEnumerable<T> messages)
{
if (messages?.Any() != true)
{
return;
}
if (!messages.Skip(1).Any())
{
await CreateAsync(messages.First());
return;
}
foreach (var json in SerializeMany(messages, _jsonSettings))
{
await _queueClient.SendMessageAsync(json);
}
}
protected IEnumerable<string> SerializeMany(IEnumerable<T> messages, JsonSerializerSettings jsonSettings)
{
var messagesLists = new List<List<T>> { new List<T>() };
var strings = new List<string>();
var ListMessageLength = 2; // to account for json array brackets "[]"
foreach (var (message, jsonEvent) in messages.Select(e => (e, JsonConvert.SerializeObject(e, jsonSettings))))
{
var messageLength = jsonEvent.Length + 1; // To account for json array comma
if (ListMessageLength + messageLength > _queueClient.MessageMaxBytes)
{
messagesLists.Add(new List<T> { message });
ListMessageLength = 2 + messageLength;
}
else
{
messagesLists.Last().Add(message);
ListMessageLength += messageLength;
}
}
return messagesLists.Select(l => JsonConvert.SerializeObject(l, jsonSettings));
}
}
}