diff --git a/ASTSync.csproj b/ASTSync.csproj index 10a821f..93ef456 100644 --- a/ASTSync.csproj +++ b/ASTSync.csproj @@ -7,7 +7,7 @@ - + diff --git a/BatchTableHelper/BatchTable.cs b/BatchTableHelper/BatchTable.cs new file mode 100644 index 0000000..0efa793 --- /dev/null +++ b/BatchTableHelper/BatchTable.cs @@ -0,0 +1,235 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; +using Azure.Data.Tables; +using Microsoft.Extensions.Logging; + +namespace ASTSync.BatchTableHelper; + +/// +/// BatchTable Helper +/// +/// The purpose of this helper is to queue table transaction actions until the batch size has been hit, +/// and then upload all transactions. Handle all batch errors by combination of re-queues and other error +/// handling. +/// +public class BatchTable : IAsyncDisposable +{ + + /// + /// Max batch size, default 100 + /// + private int _BatchSize { get; set; } + + /// + /// Name of table + /// + private string _TableName { get; set; } + + /// + /// Batch upload task + /// + private Task _batchUploadTask { get; set; } + + /// + /// Queue for batch operations + /// + /// + private ConcurrentQueue _batchActions = new ConcurrentQueue(); + + /// + /// Logger + /// + private ILogger _log { get; set; } + + /// + /// Table client + /// + private TableClient _tableClient { get; set; } + + /// + /// Batch Table + /// + /// Table Connection String + /// Table Name for batches + /// Size of batches + /// Logger for logging + public BatchTable(string ConnectionString, string TableName, int batchSize, ILogger logger) + { + + // Establish service client + var serviceClient = new TableServiceClient(ConnectionString); + + // Create required table + serviceClient.CreateTableIfNotExists(TableName); + + // Set vars + _log = logger; + _BatchSize = batchSize; + _TableName = TableName; + + _tableClient = new TableClient(ConnectionString, TableName); + } + + /// + /// Trigger upload task if one is not running + /// + private void TriggerUploadTask() + { + + if (_batchUploadTask is not null && _batchUploadTask.IsFaulted) + { + _log.LogError($"batchUploadTask for table name {_TableName} has faulted: {_batchUploadTask.Exception}"); + } + + if (_batchUploadTask is null || _batchUploadTask.IsCompleted) + _batchUploadTask = Task.Run(() => BatchUploadAsync()); + } + + /// + /// Flush all remaining items in batch, or until timeout + /// + /// Timeout when task should return + /// + public async Task FlushBatchAsync(TimeSpan? timeout) + { + + // Default to 5 minute timeout + if (timeout is null) + timeout = TimeSpan.FromMinutes(5); + + // Stop watch for timing dispose time + Stopwatch sw = Stopwatch.StartNew(); + + // If batch actions is not empty, continue to attempt to upload for 1 minute. + while (!_batchActions.IsEmpty && sw.Elapsed < timeout) + { + TriggerUploadTask(); + + // Delay task a second to allow task to complete, prevents locking thread + await Task.Delay(TimeSpan.FromSeconds(1)); + } + + + // If batch is still not empty, then log an error + if (!_batchActions.IsEmpty) + _log.LogError( + $"TableBatchHelper instructed to upload now for {_TableName} has exceeded timeout with batch items left in the queue. This is most likely due to; items that could not be uploaded (invalid) or uploads taking longer than a {timeout} to complete."); + + // Return if actions is empty + return _batchActions.IsEmpty; + } + + /// + /// Perform batch upload + /// + /// + private async Task BatchUploadAsync() + { + List BatchTransactions = new List(); + + // Used to re-queue transactions that cannot be put in this batch + // Such as transactions with a row key that is already present in the batch (cannot perform within the same batch) + + List RequeueTransactions = new List(); + + // Take items out of the queue until it's empty or the max batch size hit + while (!_batchActions.IsEmpty && BatchTransactions.Count < _BatchSize) + { + TableTransactionAction dequeued; + + if (_batchActions.TryDequeue(out dequeued)) + { + // Validate row key is not already in batch transactions + // Batches cannot contain two transactions for the same partition key and row. + + if (BatchTransactions.Any(x => + x.Entity.PartitionKey == dequeued.Entity.PartitionKey && + x.Entity.RowKey == dequeued.Entity.RowKey)) + { + // Requeue the transaction for next batch as it is already existing in this batch + RequeueTransactions.Add(dequeued); + } + else + { + BatchTransactions.Add(dequeued); + } + + } + + } + + if (BatchTransactions.Any()) + { + try + { + await _tableClient.SubmitTransactionAsync(BatchTransactions); + } + catch (TableTransactionFailedException e) + { + List failedBatch = BatchTransactions.ToList(); + + _log.LogError($"Failed to insert batch transaction in {_tableClient.Name} with partition key {failedBatch[e.FailedTransactionActionIndex.Value].Entity.PartitionKey} row key {failedBatch[e.FailedTransactionActionIndex.Value].Entity.RowKey} {e.Message}"); + + // Remove the failing item from the batch and requeue rest + failedBatch.RemoveAt(e.FailedTransactionActionIndex.Value); + foreach (TableTransactionAction action in failedBatch) + { + _batchActions.Enqueue(action); + } + } + } + + // Requeue transactions that need to be moved to another batch + if (RequeueTransactions.Any()) + { + foreach(var transaction in RequeueTransactions) + _batchActions.Enqueue(transaction); + } + + return true; + } + + /// + /// Enqueue and upload when hits max size + /// + /// + public void EnqueueUpload(TableTransactionAction action) + { + // Enqueue + _batchActions.Enqueue(action); + + // Run upload if > batch size + if (_batchActions.Count >= _BatchSize) + TriggerUploadTask(); + } + + /// + /// Batch upload and dispose. + /// + /// This will attempt to flush the queue in 1 minute. + /// + public async ValueTask DisposeAsync() + { + // Stop watch for timing dispose time + Stopwatch sw = Stopwatch.StartNew(); + + // If batch actions is not empty, continue to attempt to upload for 1 minute. + while (!_batchActions.IsEmpty && sw.Elapsed < TimeSpan.FromMinutes(1)) + { + TriggerUploadTask(); + + // Delay task a second to allow task to complete, prevents locking thread + await Task.Delay(TimeSpan.FromSeconds(1)); + } + + // If batch is still not empty, then log an error + if (!_batchActions.IsEmpty) + _log.LogError( + $"TableBatchHelper for {_TableName} has been disposed of with batch items left in the queue. This is most likely due to; items that could not be uploaded (invalid) or uploads taking longer than a minute to complete."); + + } +} \ No newline at end of file diff --git a/Models/UserSimulationDetailsCollectionResponse.cs b/Models/UserSimulationDetailsCollectionResponse.cs new file mode 100644 index 0000000..228f537 --- /dev/null +++ b/Models/UserSimulationDetailsCollectionResponse.cs @@ -0,0 +1,57 @@ +using Microsoft.Kiota.Abstractions.Serialization; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System; + +/* + * + * This is here because it went missing in Microsoft.Graph.Beta 5.28.0-preview + * + * https://github.com/microsoftgraph/msgraph-sdk-dotnet/issues/2524 + * + */ + +namespace Microsoft.Graph.Beta.Models { + public class UserSimulationDetailsCollectionResponse : BaseCollectionPaginationCountResponse, IParsable { + /// The value property +#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_1_OR_GREATER +#nullable enable + public List? Value { + get { return BackingStore?.Get?>("value"); } + set { BackingStore?.Set("value", value); } + } +#nullable restore +#else + public List Value { + get { return BackingStore?.Get>("value"); } + set { BackingStore?.Set("value", value); } + } +#endif + /// + /// Creates a new instance of the appropriate class based on discriminator value + /// + /// The parse node to use to read the discriminator value and create the object + public static new UserSimulationDetailsCollectionResponse CreateFromDiscriminatorValue(IParseNode parseNode) { + _ = parseNode ?? throw new ArgumentNullException(nameof(parseNode)); + return new UserSimulationDetailsCollectionResponse(); + } + /// + /// The deserialization information for the current model + /// + public new IDictionary> GetFieldDeserializers() { + return new Dictionary>(base.GetFieldDeserializers()) { + {"value", n => { Value = n.GetCollectionOfObjectValues(UserSimulationDetails.CreateFromDiscriminatorValue)?.ToList(); } }, + }; + } + /// + /// Serializes information the current object + /// + /// Serialization writer to use to serialize this model + public new void Serialize(ISerializationWriter writer) { + _ = writer ?? throw new ArgumentNullException(nameof(writer)); + base.Serialize(writer); + writer.WriteCollectionOfObjectValues("value", Value); + } + } +} diff --git a/README.md b/README.md index ebbbd7f..d7a1bd0 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,17 @@ The purpose of this function app is to synchronise Attack Simulation Training to Azure Table storage. +## What is stored? + +The following API methods are pulled, flattened, and stored in to Azure Table Storage: +* graph.microsoft.com/beta/security/attackSimulation/simulations -> Simulations Table +* graph.microsoft.com/beta/security/attackSimulation/simulations/{id}/reports/simulationUsers -> SimulationsUsers (stores a row for every user in the simulation) and SimulationUserEvents Table (stores all events, such as click/report, etc.) +* graph.microsoft.com/beta/security/attackSimulation/payloads -> Payloads +* graph.microsoft.com/beta/security/attackSimulation/training -> Trainings + +## Installation + +Whilst the code here could be adjusted to suit any means, it is intended to run in an Azure Function App. + +1. Create a new Azure Function App. Use the default options, and do not set up deployment at this stage unless you want to fork this repository. +2. \ No newline at end of file diff --git a/Sync.cs b/Sync.cs index 6f379c6..568ce63 100644 --- a/Sync.cs +++ b/Sync.cs @@ -2,9 +2,8 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; -using System.Linq; -using System.Threading; using System.Threading.Tasks; +using ASTSync.BatchTableHelper; using Azure.Data.Tables; using Azure.Identity; using Microsoft.Azure.WebJobs; @@ -28,217 +27,135 @@ public static class Sync private static int _maxTableBatchSize = 100; /// - /// Queue for Simulation Table entries, async batch/bulk processed + /// How frequent to sync users (default 7 days) /// - private static ConcurrentQueue tableActionQueue_Simulations = new(); + private static TimeSpan _ageUserSync = TimeSpan.FromDays(7); + + /// + /// Maintains a list of users we have already synced + /// + private static ConcurrentDictionary userListSynced = new(); /// - /// Queue for Simulation User Table entries, async batch/bulk processed + /// Logger /// - private static ConcurrentQueue tableActionQueue_SimulationUsers = new(); + private static ILogger _log { get; set; } /// - /// Queue for Simulation User Events Table entries, async batch/bulk processed + /// Batch table processor for Entra Users /// - private static ConcurrentQueue tableActionQueue_SimulationUserEvents = new(); + private static BatchTable _batchUsers { get; set; } /// - /// Queue for User entries, async batch/bulk processed + /// Batch table processor for Simulations /// - private static ConcurrentQueue tableActionQueue_Users = new(); - + private static BatchTable _batchSimulations { get; set; } + /// - /// Maintains a list of users we have already synced + /// Batch table processor for Simulation Users /// - private static ConcurrentDictionary userListSynced = new(); + private static BatchTable _batchSimulationUsers { get; set; } /// - /// Logger + /// Batch table processor for Simulation User Events /// - private static ILogger _log { get; set; } + private static BatchTable _batchSimulationUserEvents { get; set; } + + /// + /// Batch table processor for Trainings + /// + private static BatchTable _batchTrainings { get; set; } + + /// + /// Batch table processor for Payloads + /// + private static BatchTable _batchPayloads { get; set; } [FunctionName("Sync")] public static async Task RunAsync([TimerTrigger("0 */15 * * * *")] TimerInfo myTimer, ILogger log) { - + + Stopwatch sw = Stopwatch.StartNew(); + _log = log; // Get graph client var GraphClient = GetGraphServicesClient(); - _log.LogInformation($"C# Timer trigger function executed at: {DateTime.UtcNow}"); - - await CreateRequiredTables(); + _log.LogInformation($"AST Sync started at: {DateTime.UtcNow}"); - // Spin up the batch queue processor - CancellationTokenSource batchQueueProcessorTokenSource = new CancellationTokenSource(); - Task taskBatchQueueProcessor = Task.Run(() => BatchQueueProcessor(batchQueueProcessorTokenSource.Token)); + // Spin up the batch queue processors + _batchUsers = new BatchTable(GetStorageConnection(), "Users", _maxTableBatchSize, log); + _batchSimulations = new BatchTable(GetStorageConnection(), "Simulations", _maxTableBatchSize, log); + _batchSimulationUsers = new BatchTable(GetStorageConnection(), "SimulationUsers", _maxTableBatchSize, log); + _batchSimulationUserEvents = new BatchTable(GetStorageConnection(), "SimulationUserEvents", _maxTableBatchSize, log); + _batchTrainings = new BatchTable(GetStorageConnection(), "Trainings", _maxTableBatchSize, log); + _batchPayloads = new BatchTable(GetStorageConnection(), "Payloads", _maxTableBatchSize, log); // Sync Tenant Simulations to perform sync whilst sync'ing simulations to table // This can probably be moved to an async foreach below. - - var simulationIds = await GetTenantSimulations(GraphClient); + // However, need to figure out how to do the async foreach return in a lambda (graph services client paging func.) - // Sync Tenant Simulation Users - foreach (string id in simulationIds) + HashSet simulationIds; + + try { - // Throw if taskBatchQueueProcessor is faulted - if (taskBatchQueueProcessor.IsFaulted) - throw new Exception($"taskBatchQueueProcessor has faulted: {taskBatchQueueProcessor.Exception}"); - - // Run get simulation users - await GetTenantSimulationUsers(GraphClient, id); + simulationIds = await GetTenantSimulations(GraphClient); } - - // Wait for batch queue processor to complete - while (!taskBatchQueueProcessor.IsCompleted) + catch (Exception e) { - // Signal to batch queue processor work has completed - batchQueueProcessorTokenSource.Cancel(); - - // Sleep for a second - await Task.Delay(1000); + _log.LogError($"Failed to get simulations: {e}"); + throw; } - } - - /// - /// Processes the queues - /// - private static async Task BatchQueueProcessor(CancellationToken cancellationToken) - { - while (!cancellationToken.IsCancellationRequested) - { - // Tasks for submission tasks - List SubmissionTasks = new List(); - // Process Simulations Queue - if (tableActionQueue_Simulations.Any()) - SubmissionTasks.Add(Task.Run(() => BatchQueueProcess(tableActionQueue_Simulations, - new TableClient(GetStorageConnection(), "Simulations"), cancellationToken))); - // Process SimulationUsers Queue - if (tableActionQueue_SimulationUsers.Any()) - SubmissionTasks.Add(Task.Run(() => BatchQueueProcess(tableActionQueue_SimulationUsers, - new TableClient(GetStorageConnection(), "SimulationUsers"), cancellationToken))); - - // Process SimulationUserEvents Queue - if (tableActionQueue_SimulationUserEvents.Any()) - SubmissionTasks.Add(Task.Run(() => BatchQueueProcess(tableActionQueue_SimulationUserEvents, - new TableClient(GetStorageConnection(), "SimulationUserEvents"), cancellationToken))); + // Sync Tenant Simulation Users + foreach (string id in simulationIds) + { + try + { + await GetTenantSimulationUsers(GraphClient, id); + } + catch (Exception e) + { + _log.LogError($"Failed to get simulation users for simulation {id}: {e}"); + } - // Process Users Queue - if (tableActionQueue_Users.Any()) - SubmissionTasks.Add(Task.Run(() => BatchQueueProcess(tableActionQueue_Users, - new TableClient(GetStorageConnection(), "Users"), cancellationToken))); - - // Wait for all submission tasks to complete - await Task.WhenAll(SubmissionTasks); - - // Sleep for ten seconds - await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken); } + - // Flush all queues if not empty - while (!tableActionQueue_SimulationUsers.IsEmpty && !tableActionQueue_SimulationUserEvents.IsEmpty && - !tableActionQueue_SimulationUserEvents.IsEmpty && !tableActionQueue_Users.IsEmpty) + // Remaining syncs + try { - await BatchQueueProcess(tableActionQueue_Simulations, - new TableClient(GetStorageConnection(), "Simulations"), cancellationToken); - await BatchQueueProcess(tableActionQueue_SimulationUsers, - new TableClient(GetStorageConnection(), "SimulationUsers"), cancellationToken); - await BatchQueueProcess(tableActionQueue_SimulationUserEvents, - new TableClient(GetStorageConnection(), "SimulationUserEvents"), cancellationToken); - await BatchQueueProcess(tableActionQueue_Users, - new TableClient(GetStorageConnection(), "Users"), cancellationToken); + await GetTrainings(GraphClient); } - - } - - /// - /// Processes a queue - /// - private static async Task BatchQueueProcess(ConcurrentQueue Queue, TableClient tableClient, CancellationToken ct) - { - List BatchTransactions = new List(); - - // Used to re-queue transactions that cannot be put in this batch - // Such as transactions with a row key that is already present in the batch (cannot perform within the same batch) - - List RequeueTransactions = new List(); - - // Take items out of the queue until it's empty or the max batch size hit - while (!Queue.IsEmpty && BatchTransactions.Count < _maxTableBatchSize) + catch (Exception e) { - TableTransactionAction dequeued; - - if (Queue.TryDequeue(out dequeued)) - { - // Validate row key is not already in batch transactions - // Batches cannot contain two transactions for the same partition key and row. - - if (BatchTransactions.Any(x => - x.Entity.PartitionKey == dequeued.Entity.PartitionKey && - x.Entity.RowKey == dequeued.Entity.RowKey)) - { - // Requeue the transaction for next batch as it is already existing in this batch - RequeueTransactions.Add(dequeued); - } - else - { - BatchTransactions.Add(dequeued); - } - - } - + _log.LogError($"Failed to get trainings: {e}"); } - - if (BatchTransactions.Any()) + + try { - // Submit the transactions - _log.LogInformation($"Uploading batch to {tableClient.Name} of size {BatchTransactions.Count}"); - - try - { - await tableClient.SubmitTransactionAsync(BatchTransactions); - } - catch (TableTransactionFailedException e) - { - List failedBatch = BatchTransactions.ToList(); - - _log.LogError($"Failed to insert batch transaction in {tableClient.Name} with partition key {failedBatch[e.FailedTransactionActionIndex.Value].Entity.PartitionKey} row key {failedBatch[e.FailedTransactionActionIndex.Value].Entity.RowKey} {e.Message}"); - - // Remove the failing item from the batch and requeue rest - failedBatch.RemoveAt(e.FailedTransactionActionIndex.Value); - foreach (TableTransactionAction action in failedBatch) - { - Queue.Enqueue(action); - } - } + await GetPayloads(GraphClient); } - - // Requeue transactions - if (RequeueTransactions.Any()) + catch (Exception e) { - foreach(var transaction in RequeueTransactions) - Queue.Enqueue(transaction); + _log.LogError($"Failed to get payloads: {e}"); } - } - - /// - /// Create required tables - /// - private static async Task CreateRequiredTables() - { - // Establish service client - var serviceClient = new TableServiceClient(GetStorageConnection()); + + // Dispose of all batch processors + await _batchUsers.DisposeAsync(); + await _batchSimulations.DisposeAsync(); + await _batchSimulationUsers.DisposeAsync(); + await _batchSimulationUserEvents.DisposeAsync(); + await _batchTrainings.DisposeAsync(); + await _batchPayloads.DisposeAsync(); + + _log.LogInformation($"AST sync completed synchronising {simulationIds.Count} simulations in {sw.Elapsed}"); - // Create required tables - await serviceClient.CreateTableIfNotExistsAsync("Simulations"); - await serviceClient.CreateTableIfNotExistsAsync("SimulationUsers"); - await serviceClient.CreateTableIfNotExistsAsync("SimulationUserEvents"); - await serviceClient.CreateTableIfNotExistsAsync("Users"); } + /// /// Get Simulations for Tenant /// @@ -289,7 +206,7 @@ private static async Task> GetTenantSimulations(GraphServiceClie } // Add the table item - tableActionQueue_Simulations.Enqueue(new TableTransactionAction(TableTransactionActionType.UpdateReplace, new TableEntity("Simulations", sim.Id) + _batchSimulations.EnqueueUpload(new TableTransactionAction(TableTransactionActionType.UpdateReplace, new TableEntity("Simulations", sim.Id) { {"AttackTechnique", sim.AttackTechnique.ToString()}, {"AttackType", sim.AttackType.ToString()}, @@ -320,9 +237,133 @@ private static async Task> GetTenantSimulations(GraphServiceClie await pageIterator.IterateAsync(); + // Flush batch simulations + await _batchSimulations.FlushBatchAsync(TimeSpan.FromMinutes(5)); + return SimulationIds; } + /// + /// Get Trainings + /// + /// + private static async Task GetTrainings(GraphServiceClient GraphClient) + { + + Stopwatch sw = Stopwatch.StartNew(); + _log.LogInformation("Synchronising trainings"); + + // Get simulation results + var results = await GraphClient + .Security + .AttackSimulation + .Trainings + .GetAsync((requestConfiguration) => + { + requestConfiguration.QueryParameters.Top = 1000; + }); + + var pageIterator = Microsoft.Graph.PageIterator + .CreatePageIterator(GraphClient, results, async (training) => + { + + // Add the table item + _batchTrainings.EnqueueUpload(new TableTransactionAction(TableTransactionActionType.UpdateReplace, new TableEntity("Trainings", training.Id) + { + {"TrainingId", training.Id}, + {"DisplayName", training.DisplayName}, + {"Description", training.Description}, + {"DurationInMinutes", training.DurationInMinutes}, + {"Source", training.Source.ToString()}, + {"Type", training.Type?.ToString()}, + {"availabilityStatus", training.AvailabilityStatus?.ToString()}, + {"HasEvaluation", training.HasEvaluation}, + {"CreatedBy_Id", training.CreatedBy?.Id}, + {"CreatedBy_DisplayName", training.CreatedBy?.DisplayName}, + {"CreatedBy_Email", training.CreatedBy?.Email}, + {"LastModifiedBy_Id", training.LastModifiedBy?.Id}, + {"LastModifiedBy_DisplayName", training.LastModifiedBy?.DisplayName}, + {"LastModifiedBy_Email", training.LastModifiedBy?.Email}, + {"LastModifiedDateTime", training.LastModifiedDateTime}, + })); + + return true; + }); + + await pageIterator.IterateAsync(); + + // Flush remaining trainings + await _batchTrainings.FlushBatchAsync(TimeSpan.FromMinutes(5)); + + _log.LogInformation($"Synchronising trainings complete in {sw.Elapsed}"); + + return true; + } + + /// + /// Get Payloads + /// + /// + private static async Task GetPayloads(GraphServiceClient GraphClient) + { + + Stopwatch sw = Stopwatch.StartNew(); + _log.LogInformation("Synchronising payloads"); + + // Get simulation results + var results = await GraphClient + .Security + .AttackSimulation + .Payloads + .GetAsync((requestConfiguration) => + { + requestConfiguration.QueryParameters.Top = 1000; + }); + + var pageIterator = Microsoft.Graph.PageIterator + .CreatePageIterator(GraphClient, results, async (payload) => + { + + // Add the table item + _batchPayloads.EnqueueUpload(new TableTransactionAction(TableTransactionActionType.UpdateReplace, new TableEntity("Payloads", payload.Id) + { + {"PayloadId", payload.Id}, + {"DisplayName", payload.DisplayName}, + {"Description", payload.Description}, + {"SimulationAttackType", payload.SimulationAttackType?.ToString()}, + {"Platform", payload.Platform?.ToString()}, + {"Status", payload.Status?.ToString()}, + {"Source", payload.Source?.ToString()}, + {"PredictedCompromiseRate", payload.PredictedCompromiseRate}, + {"Complexity", payload.Complexity?.ToString()}, + {"Technique", payload.Technique?.ToString()}, + {"Theme", payload.Theme?.ToString()}, + {"Brand", payload.Brand?.ToString()}, + {"Industry", payload.Industry?.ToString()}, + {"IsCurrentEvent", payload.IsCurrentEvent}, + {"IsControversial", payload.IsControversial}, + {"CreatedBy_Id", payload.CreatedBy?.Id}, + {"CreatedBy_DisplayName", payload.CreatedBy?.DisplayName}, + {"CreatedBy_Email", payload.CreatedBy?.Email}, + {"LastModifiedBy_Id", payload.LastModifiedBy?.Id}, + {"LastModifiedBy_DisplayName", payload.LastModifiedBy?.DisplayName}, + {"LastModifiedBy_Email", payload.LastModifiedBy?.Email}, + {"LastModifiedDateTime", payload.LastModifiedDateTime}, + })); + + return true; + }); + + await pageIterator.IterateAsync(); + + // Flush remaining payloads + await _batchPayloads.FlushBatchAsync(TimeSpan.FromMinutes(5)); + + _log.LogInformation($"Synchronising payloads complete in {sw.Elapsed}"); + + return true; + } + /// /// Get Simulations Users /// @@ -347,7 +388,7 @@ private static async Task GetTenantSimulationUsers(GraphServiceClient GraphClien string id = $"{SimulationId}-{userSimDetail.SimulationUser?.UserId}"; // Add the table item - tableActionQueue_SimulationUsers.Enqueue(new TableTransactionAction(TableTransactionActionType.UpdateReplace, new TableEntity(SimulationId, userSimDetail.SimulationUser?.UserId) + _batchSimulationUsers.EnqueueUpload(new TableTransactionAction(TableTransactionActionType.UpdateReplace, new TableEntity(SimulationId, userSimDetail.SimulationUser?.UserId) { {"SimulationUser_Id", id}, {"SimulationId", SimulationId}, @@ -373,7 +414,7 @@ private static async Task GetTenantSimulationUsers(GraphServiceClient GraphClien { foreach (var simulationUserEvents in userSimDetail.SimulationEvents) { - tableActionQueue_SimulationUserEvents.Enqueue(new TableTransactionAction(TableTransactionActionType.UpdateReplace, new TableEntity(SimulationId, $"{userSimDetail.SimulationUser?.UserId}_{simulationUserEvents.EventName}_{simulationUserEvents.EventDateTime.Value.ToUnixTimeSeconds()}") + _batchSimulationUserEvents.EnqueueUpload(new TableTransactionAction(TableTransactionActionType.UpdateReplace, new TableEntity(SimulationId, $"{userSimDetail.SimulationUser?.UserId}_{simulationUserEvents.EventName}_{simulationUserEvents.EventDateTime.Value.ToUnixTimeSeconds()}") { {"SimulationUser_Id", id}, {"SimulationUser_UserId", userSimDetail.SimulationUser?.UserId}, @@ -394,18 +435,19 @@ private static async Task GetTenantSimulationUsers(GraphServiceClient GraphClien await pageIterator.IterateAsync(); // update in the Simulations table that this has been syncd - tableActionQueue_Simulations.Enqueue(new TableTransactionAction(TableTransactionActionType.UpsertMerge, new TableEntity("Simulations", SimulationId) + _batchSimulations.EnqueueUpload(new TableTransactionAction(TableTransactionActionType.UpsertMerge, new TableEntity("Simulations", SimulationId) { {"LastUserSync", DateTime.UtcNow}, })); - _log.LogInformation($"Full user synchronisation of {SimulationId} completed in {sw.Elapsed}"); + // Flush batch simulations for users and events + await _batchSimulationUsers.FlushBatchAsync(TimeSpan.FromMinutes(5)); + await _batchSimulationUserEvents.FlushBatchAsync(TimeSpan.FromMinutes(5)); - // Perform a task delay here to allow other threads to complete - await Task.Delay(1000); + _log.LogInformation($"Full user synchronisation of {SimulationId} completed in {sw.Elapsed}"); } - + /// /// Get the Graph Client for Tenant /// @@ -426,7 +468,7 @@ private static GraphServiceClient GetGraphServicesClient() /// Get Storage Connection from App settings /// /// - public static string GetStorageConnection() => Environment.GetEnvironmentVariable("AzureWebJobsStorage", EnvironmentVariableTarget.Process); + private static string GetStorageConnection() => Environment.GetEnvironmentVariable("AzureWebJobsStorage", EnvironmentVariableTarget.Process); /// /// Synchronise user @@ -444,7 +486,7 @@ private static async Task SyncUser(GraphServiceClient GraphClient, string id) if (User is not null) { - tableActionQueue_Users.Enqueue(new TableTransactionAction(TableTransactionActionType.UpdateReplace, new TableEntity("Users", id) + _batchUsers.EnqueueUpload(new TableTransactionAction(TableTransactionActionType.UpdateReplace, new TableEntity("Users", id) { {"DisplayName", User.DisplayName}, {"GivenName", User.GivenName}, @@ -464,10 +506,10 @@ private static async Task SyncUser(GraphServiceClient GraphClient, string id) } catch (ODataError e) { - if (e.Error.Code == "Request_ResourceNotFound") + if (e.Error is not null && e.Error.Code == "Request_ResourceNotFound") { - // User no longer exists - tableActionQueue_Users.Enqueue(new TableTransactionAction(TableTransactionActionType.UpsertMerge, new TableEntity("Users", id) + // User no longer exists, update table entity + _batchUsers.EnqueueUpload(new TableTransactionAction(TableTransactionActionType.UpsertMerge, new TableEntity("Users", id) { {"Exists", "false"}, {"LastUserSync", DateTime.UtcNow}, @@ -483,6 +525,8 @@ private static async Task SyncUser(GraphServiceClient GraphClient, string id) /// /// Determine if should sync user + /// + /// This prevents continously syncing the user /// /// /// @@ -506,7 +550,7 @@ private static async Task ShouldSyncUser(string id) LastUserSync = DateTime.SpecifyKind(DateTime.Parse(UserTableItem.Value["LastUserSync"].ToString()), DateTimeKind.Utc); // If no sync or days is older than a week - if (LastUserSync < DateTime.UtcNow.AddDays(-7)) + if (LastUserSync < DateTime.UtcNow.Subtract(_ageUserSync)) return true; // Add to userSyncList so we don't need to check again