2021-05-17 09:43:02 -05:00
|
|
|
using System.Collections.Generic;
|
|
|
|
|
using System.Linq;
|
|
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
using Azure.Storage.Queues;
|
2021-07-02 17:11:33 -04:00
|
|
|
using Bit.Core.Utilities;
|
2021-05-17 09:43:02 -05:00
|
|
|
using Microsoft.EntityFrameworkCore.Internal;
|
|
|
|
|
using Newtonsoft.Json;
|
|
|
|
|
|
|
|
|
|
namespace Bit.Core.Services
|
|
|
|
|
{
|
|
|
|
|
public abstract class AzureQueueService<T>
|
|
|
|
|
{
|
|
|
|
|
protected QueueClient _queueClient;
|
|
|
|
|
protected JsonSerializerSettings _jsonSettings;
|
|
|
|
|
|
|
|
|
|
protected AzureQueueService(QueueClient queueClient, JsonSerializerSettings jsonSettings)
|
|
|
|
|
{
|
|
|
|
|
_queueClient = queueClient;
|
|
|
|
|
_jsonSettings = jsonSettings;
|
2021-07-02 17:11:33 -04:00
|
|
|
if (!_jsonSettings.Converters.Any(c => c.GetType() == typeof(EncodedStringConverter)))
|
|
|
|
|
{
|
|
|
|
|
_jsonSettings.Converters.Add(new EncodedStringConverter());
|
|
|
|
|
}
|
2021-05-17 09:43:02 -05:00
|
|
|
}
|
|
|
|
|
|
2021-07-02 17:11:33 -04:00
|
|
|
public async Task CreateAsync(T message) => await CreateManyAsync(new[] { message });
|
2021-05-17 09:43:02 -05:00
|
|
|
|
|
|
|
|
public async Task CreateManyAsync(IEnumerable<T> messages)
|
|
|
|
|
{
|
|
|
|
|
if (messages?.Any() != true)
|
|
|
|
|
{
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
2021-07-02 17:11:33 -04:00
|
|
|
foreach (var json in SerializeMany(messages))
|
2021-05-17 09:43:02 -05:00
|
|
|
{
|
|
|
|
|
await _queueClient.SendMessageAsync(json);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2021-07-02 17:11:33 -04:00
|
|
|
private IEnumerable<string> SerializeMany(IEnumerable<T> messages)
|
2021-05-17 09:43:02 -05:00
|
|
|
{
|
2021-07-02 17:11:33 -04:00
|
|
|
string SerializeMessage(T message) => JsonConvert.SerializeObject(message, _jsonSettings);
|
|
|
|
|
|
2021-05-17 09:43:02 -05:00
|
|
|
var messagesLists = new List<List<T>> { new List<T>() };
|
|
|
|
|
var strings = new List<string>();
|
|
|
|
|
var ListMessageLength = 2; // to account for json array brackets "[]"
|
2021-07-02 17:11:33 -04:00
|
|
|
foreach (var (message, jsonEvent) in messages.Select(m => (m, SerializeMessage(m))))
|
2021-05-17 09:43:02 -05:00
|
|
|
{
|
|
|
|
|
|
|
|
|
|
var messageLength = jsonEvent.Length + 1; // To account for json array comma
|
|
|
|
|
if (ListMessageLength + messageLength > _queueClient.MessageMaxBytes)
|
|
|
|
|
{
|
|
|
|
|
messagesLists.Add(new List<T> { message });
|
|
|
|
|
ListMessageLength = 2 + messageLength;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
messagesLists.Last().Add(message);
|
|
|
|
|
ListMessageLength += messageLength;
|
|
|
|
|
}
|
|
|
|
|
}
|
2021-07-02 17:11:33 -04:00
|
|
|
return messagesLists.Select(l => JsonConvert.SerializeObject(l, _jsonSettings));
|
2021-05-17 09:43:02 -05:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|