Get rid of the mail item queue system. Go back to 6 months initial sync strategy.

This commit is contained in:
Burak Kaan Köse
2025-11-01 12:11:05 +01:00
parent 5186b14905
commit b60832a270
18 changed files with 356 additions and 763 deletions
+109 -320
View File
@@ -51,23 +51,22 @@ public partial class OutlookSynchronizerJsonContext : JsonSerializerContext;
/// <summary>
/// Outlook synchronizer implementation with queue-based metadata-only synchronization.
/// Outlook synchronizer implementation with delta token synchronization.
///
/// SYNCHRONIZATION STRATEGY:
/// - Uses per-folder queue system (unlike Gmail's per-account queue)
/// - During sync (initial/delta), only message metadata is downloaded (no MIME content)
/// - Messages are queued by folder using MailItemQueue with RemoteFolderId
/// - MailCopy objects are created from Graph API metadata fields only
/// - Uses delta API for both initial and incremental sync
/// - Initial sync: Downloads last 30 days of emails with metadata only
/// - Incremental sync: Uses delta token to get only changes since last sync
/// - Messages are downloaded with metadata only (no MIME content during sync)
/// - MIME files are downloaded on-demand when user explicitly reads a message
/// - This dramatically reduces bandwidth usage and sync time
///
/// Key implementation details:
/// - QueueMailIdsForFolderAsync: Queues all mail IDs for a folder using Delta API
/// - ProcessMailQueueForFolderAsync: Downloads metadata in batches from queue
/// - DownloadMessageMetadataBatchAsync: Concurrently downloads metadata for batches
/// - CreateMailCopyFromMessage: Centralized method to create MailCopy from Message (metadata only)
/// - SynchronizeFolderAsync: Main entry point for per-folder synchronization
/// - DownloadMailsForInitialSyncAsync: Downloads last 30 days using delta API with filter
/// - ProcessDeltaChangesAsync: Processes incremental changes using delta token
/// - DownloadMessageMetadataBatchAsync: Downloads metadata in batches using Graph batch API
/// - CreateMailCopyFromMessageAsync: Creates MailCopy from Message metadata
/// - DownloadMissingMimeMessageAsync: Downloads raw MIME only when explicitly requested
/// - CreateNewMailPackagesAsync: Only used for search results and special cases (downloads MIME)
/// </summary>
public class OutlookSynchronizer : WinoSynchronizer<RequestInformation, Message, Event>
{
@@ -227,26 +226,22 @@ public class OutlookSynchronizer : WinoSynchronizer<RequestInformation, Message,
cancellationToken.ThrowIfCancellationRequested();
_logger.Debug("Synchronizing {FolderName} with direct download approach", folder.FolderName);
_logger.Debug("Synchronizing {FolderName} using delta API", folder.FolderName);
// Check if initial sync is completed for this folder
if (folder.FolderStatus != InitialSynchronizationStatus.Completed)
// Check if we have a delta token
if (string.IsNullOrEmpty(folder.DeltaToken))
{
_logger.Debug("Initial sync not completed for folder {FolderName}. Starting mail synchronization.", folder.FolderName);
_logger.Debug("No delta token for folder {FolderName}. Starting initial sync (last 30 days).", folder.FolderName);
// Download mails for initial sync
// Download mails for initial sync (last 30 days)
await DownloadMailsForInitialSyncAsync(folder, downloadedMessageIds, cancellationToken).ConfigureAwait(false);
// Mark initial sync as completed
await _outlookChangeProcessor.UpdateFolderInitialSyncCompletedAsync(folder.Id, true).ConfigureAwait(false);
folder.FolderStatus = InitialSynchronizationStatus.Completed;
}
else
{
// Initial sync is completed, process delta changes and download new mails
_logger.Debug("Initial sync completed for folder {FolderName}. Processing delta changes and downloading new mails.", folder.FolderName);
// Initial sync is completed, process delta changes
_logger.Debug("Delta token exists for folder {FolderName}. Processing incremental changes.", folder.FolderName);
await ProcessDeltaChangesAndDownloadMailsAsync(folder, downloadedMessageIds, cancellationToken).ConfigureAwait(false);
await ProcessDeltaChangesAsync(folder, downloadedMessageIds, cancellationToken).ConfigureAwait(false);
}
await _outlookChangeProcessor.UpdateFolderLastSyncDateAsync(folder.Id).ConfigureAwait(false);
@@ -260,24 +255,107 @@ public class OutlookSynchronizer : WinoSynchronizer<RequestInformation, Message,
}
/// <summary>
/// Downloads mails for initial synchronization using Delta API and queue-based system.
/// First, queues all mail IDs, then downloads metadata in batches.
/// Downloads mails for initial synchronization using Delta API with 30-day filter.
/// Downloads metadata only (no MIME content) for messages received in the last 30 days.
/// </summary>
private async Task DownloadMailsForInitialSyncAsync(MailItemFolder folder, List<string> downloadedMessageIds, CancellationToken cancellationToken)
{
_logger.Debug("Starting initial mail download for folder {FolderName}", folder.FolderName);
_logger.Debug("Starting initial mail download for folder {FolderName} (last 6 months)", folder.FolderName);
try
{
// Step 1: Queue all mail IDs using Delta API
await QueueMailIdsForFolderAsync(folder, cancellationToken).ConfigureAwait(false);
// Calculate date 6 months ago
var sixMonthsAgo = DateTime.UtcNow.AddMonths(-6);
var filterDate = sixMonthsAgo.ToString("yyyy-MM-ddTHH:mm:ssZ");
// Step 2: Process queued mail IDs in batches
await ProcessMailQueueForFolderAsync(folder, downloadedMessageIds, cancellationToken).ConfigureAwait(false);
_logger.Information("Downloading messages received after {FilterDate} for folder {FolderName}", filterDate, folder.FolderName);
// Use Delta API with receivedDateTime filter for last 6 months
var messageCollectionPage = await _graphClient.Me.MailFolders[folder.RemoteFolderId].Messages.Delta.GetAsDeltaGetResponseAsync((config) =>
{
config.QueryParameters.Select = outlookMessageSelectParameters;
config.QueryParameters.Orderby = ["receivedDateTime desc"];
config.QueryParameters.Filter = $"receivedDateTime ge {filterDate}";
}, cancellationToken).ConfigureAwait(false);
var totalProcessed = 0;
// Use PageIterator to process all messages
var messageIterator = PageIterator<Message, DeltaGetResponse>.CreatePageIterator(_graphClient, messageCollectionPage, async (message) =>
{
try
{
await _handleItemRetrievalSemaphore.WaitAsync();
if (!IsResourceDeleted(message.AdditionalData) && !IsNotRealMessageType(message))
{
// Check if message already exists
bool mailExists = await _outlookChangeProcessor.IsMailExistsInFolderAsync(message.Id, folder.Id).ConfigureAwait(false);
if (!mailExists)
{
// Create MailCopy from metadata
var mailCopy = await CreateMailCopyFromMessageAsync(message, folder).ConfigureAwait(false);
if (mailCopy != null)
{
// Create package without MIME
var package = new NewMailItemPackage(mailCopy, null, folder.RemoteFolderId);
bool isInserted = await _outlookChangeProcessor.CreateMailAsync(Account.Id, package).ConfigureAwait(false);
if (isInserted)
{
downloadedMessageIds.Add(mailCopy.Id);
totalProcessed++;
// Update progress periodically
if (totalProcessed % 50 == 0)
{
UpdateSyncProgress(0, 0, $"Downloaded {totalProcessed} messages from {folder.FolderName}");
}
}
}
}
else
{
_logger.Debug("Mail {MailId} already exists in folder {FolderName}, skipping", message.Id, folder.FolderName);
}
}
return true; // Continue processing
}
catch (Exception ex)
{
_logger.Error(ex, "Failed to process message {MessageId} during initial sync for folder {FolderName}", message.Id, folder.FolderName);
return true; // Continue despite error
}
finally
{
_handleItemRetrievalSemaphore.Release();
}
});
await messageIterator.IterateAsync(cancellationToken).ConfigureAwait(false);
// Extract and store delta token for future incremental syncs
if (!string.IsNullOrEmpty(messageIterator.Deltalink))
{
var deltaToken = GetDeltaTokenFromDeltaLink(messageIterator.Deltalink);
await _outlookChangeProcessor.UpdateFolderDeltaSynchronizationIdentifierAsync(folder.Id, deltaToken).ConfigureAwait(false);
await _outlookChangeProcessor.UpdateFolderLastSyncDateAsync(folder.Id).ConfigureAwait(false);
folder.DeltaToken = deltaToken;
_logger.Information("Stored delta token for folder {FolderName} - future syncs will be incremental", folder.FolderName);
}
else
{
_logger.Warning("No delta token received for folder {FolderName} - future syncs may re-download messages", folder.FolderName);
}
_logger.Information("Initial sync completed for folder {FolderName}. Downloaded {Count} messages", folder.FolderName, totalProcessed);
}
catch (ApiException apiException)
{
// Try to handle the error using the error handling factory
// Handle API errors
var errorContext = new SynchronizerErrorContext
{
Account = Account,
@@ -290,22 +368,17 @@ public class OutlookSynchronizer : WinoSynchronizer<RequestInformation, Message,
if (handled)
{
// The error handler has processed the error (e.g., DeltaTokenExpiredHandler for 410)
// Update in-memory folder state if it was a delta token expiration
if (apiException.ResponseStatusCode == 410)
{
folder.DeltaToken = string.Empty;
folder.FolderStatus = InitialSynchronizationStatus.None;
_logger.Information("API error handled successfully for folder {FolderName} during initial sync. Error: {ErrorCode}", folder.FolderName, apiException.ResponseStatusCode);
}
}
else
{
// No handler could process this error, log and re-throw
_logger.Error(apiException, "Unhandled API error during initial sync for folder {FolderName}. Error: {ErrorCode}", folder.FolderName, apiException.ResponseStatusCode);
}
// Re-throw the exception so the synchronization can be retried
throw;
}
catch (Exception ex)
@@ -315,191 +388,6 @@ public class OutlookSynchronizer : WinoSynchronizer<RequestInformation, Message,
}
}
/// <summary>
/// Queues all mail IDs for a folder using Delta API.
/// Only retrieves message IDs to minimize data transfer.
/// </summary>
private async Task QueueMailIdsForFolderAsync(MailItemFolder folder, CancellationToken cancellationToken)
{
_logger.Debug("Queuing mail IDs for folder {FolderName}", folder.FolderName);
var mailIds = new List<string>();
// Always use Delta API for initial sync - this ensures proper delta token setup for future incremental syncs
DeltaGetResponse messageCollectionPage = null;
if (string.IsNullOrEmpty(folder.DeltaToken))
{
messageCollectionPage = await _graphClient.Me.MailFolders[folder.RemoteFolderId].Messages.Delta.GetAsDeltaGetResponseAsync((config) =>
{
config.QueryParameters.Select = ["Id"]; // Only get the message Ids
config.QueryParameters.Orderby = ["receivedDateTime desc"]; // Sort by received date desc
// config.QueryParameters.Top = (int)InitialMessageDownloadCountPerFolder;
}, cancellationToken).ConfigureAwait(false);
}
else
{
var requestInformation = _graphClient.Me.MailFolders[folder.RemoteFolderId].Messages.Delta.ToGetRequestInformation((config) =>
{
config.QueryParameters.Select = ["Id"]; // Only get the message Ids
config.QueryParameters.Orderby = ["receivedDateTime desc"]; // Sort by received date desc
});
requestInformation.UrlTemplate = requestInformation.UrlTemplate.Insert(requestInformation.UrlTemplate.Length - 1, ",%24deltatoken");
requestInformation.QueryParameters.Add("%24deltatoken", folder.DeltaToken);
messageCollectionPage = await _graphClient.RequestAdapter.SendAsync(requestInformation, DeltaGetResponse.CreateFromDiscriminatorValue, cancellationToken: cancellationToken);
}
// Use PageIterator to iterate through all messages and collect IDs
var messageIterator = PageIterator<Message, DeltaGetResponse>.CreatePageIterator(_graphClient, messageCollectionPage, (message) =>
{
if (!IsResourceDeleted(message.AdditionalData))
{
mailIds.Add(message.Id);
}
// Iterator must continue all the time to receive delta token at the end.
return true;
});
await messageIterator.IterateAsync(cancellationToken).ConfigureAwait(false);
// Extract delta token from the iterator's delta link
string deltaToken = null;
if (!string.IsNullOrEmpty(messageIterator.Deltalink))
{
deltaToken = GetDeltaTokenFromDeltaLink(messageIterator.Deltalink);
}
// Queue all mail IDs for processing
if (mailIds.Any())
{
var queueEntries = mailIds.Select(id => new MailItemQueue
{
Id = Guid.CreateVersion7(),
AccountId = Account.Id,
RemoteServerId = id,
RemoteFolderId = folder.RemoteFolderId,
IsProcessed = false,
CreatedAt = DateTime.UtcNow
});
await _outlookChangeProcessor.AddMailItemQueueItemsAsync(queueEntries).ConfigureAwait(false);
_logger.Information("Queued {Count} mail IDs for folder {FolderName}", mailIds.Count, folder.FolderName);
}
else
{
_logger.Information("No mail ids found to queue for folder {FolderName}", folder.FolderName);
}
// Store the delta token for future incremental syncs - always store when available
if (!string.IsNullOrEmpty(deltaToken))
{
await _outlookChangeProcessor.UpdateFolderDeltaSynchronizationIdentifierAsync(folder.Id, deltaToken).ConfigureAwait(false);
await _outlookChangeProcessor.UpdateFolderLastSyncDateAsync(folder.Id).ConfigureAwait(false);
folder.DeltaToken = deltaToken;
_logger.Information("Stored delta token for folder {FolderName} - future syncs will be incremental", folder.FolderName);
}
else
{
_logger.Warning("No delta token received for folder {FolderName} - future syncs may re-download messages", folder.FolderName);
}
}
/// <summary>
/// Processes queued mail IDs in batches, downloading metadata only (no MIME).
/// </summary>
private async Task ProcessMailQueueForFolderAsync(MailItemFolder folder, List<string> downloadedMessageIds, CancellationToken cancellationToken)
{
var totalInQueue = await _outlookChangeProcessor.GetMailItemQueueCountByFolderAsync(Account.Id, folder.RemoteFolderId).ConfigureAwait(false);
if (totalInQueue == 0)
{
_logger.Information("No mails in queue for folder {FolderName}", folder.FolderName);
return;
}
_logger.Information("Processing {Count} queued mails for folder {FolderName}", totalInQueue, folder.FolderName);
var totalFailed = 0;
var totalProcessed = 0;
// Set initial progress for queue processing
UpdateSyncProgress(totalInQueue, totalInQueue, $"Downloading {folder.FolderName}...");
// Continue until all emails in queue are processed
while (true)
{
// Get next batch of unprocessed emails from queue
var mailItemQueue = await _outlookChangeProcessor.GetMailItemQueueByFolderAsync(Account.Id, folder.RemoteFolderId, 100).ConfigureAwait(false);
if (mailItemQueue.Count == 0)
break; // No more emails to process
// Remove the items that should be deleted from queue first
mailItemQueue.RemoveAll(a => a.ShouldDelete());
var mailChunks = mailItemQueue.Chunk(20); // Process 20 at a time
foreach (var chunk in mailChunks)
{
cancellationToken.ThrowIfCancellationRequested();
// Collect message IDs from the chunk
var messageIdsToDownload = chunk.Select(q => q.RemoteServerId).ToList();
try
{
// Download all messages in this chunk concurrently
var chunkDownloadedIds = await DownloadMessageMetadataBatchAsync(messageIdsToDownload, folder, true, cancellationToken).ConfigureAwait(false);
downloadedMessageIds.AddRange(chunkDownloadedIds);
// Mark all items in chunk as processed
foreach (var queueItem in chunk)
{
queueItem.IsProcessed = true;
queueItem.ProcessedAt = DateTime.UtcNow;
totalProcessed++;
}
// Update progress with remaining items
var remainingItems = totalInQueue - totalProcessed;
UpdateSyncProgress(totalInQueue, remainingItems, $"Downloading {folder.FolderName} ({totalProcessed}/{totalInQueue})");
}
catch (Exception ex)
{
_logger.Error(ex, "Failed to download chunk of messages for folder {FolderName}", folder.FolderName);
// Mark all items in chunk as failed
foreach (var queueItem in chunk)
{
queueItem.IsProcessed = false;
queueItem.ProcessedAt = null;
queueItem.FailedCount++;
totalFailed++;
}
}
await _outlookChangeProcessor.UpdateMailItemQueueAsync(mailItemQueue).ConfigureAwait(false);
// If too many failures, pause to avoid hitting rate limits
if (totalFailed > 50)
{
_logger.Warning("Too many failures ({Count}), pausing for 10 seconds", totalFailed);
await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
totalFailed = 0; // Reset counter
}
}
_logger.Debug("Processed batch: {Processed}/{Total} for folder {FolderName}", totalProcessed, totalInQueue, folder.FolderName);
}
_logger.Information("Completed processing queue for folder {FolderName}. Processed: {Count}", folder.FolderName, totalProcessed);
}
/// <summary>
/// Downloads metadata for a batch of messages using Graph SDK batch API (no MIME content).
/// Processes up to 20 messages per batch request as per MaximumAllowedBatchRequestSize.
@@ -713,12 +601,6 @@ public class OutlookSynchronizer : WinoSynchronizer<RequestInformation, Message,
private string GetDeltaTokenFromDeltaLink(string deltaLink)
=> Regex.Split(deltaLink, "deltatoken=")[1];
protected override async Task QueueMailIdsForInitialSyncAsync(MailItemFolder folder, CancellationToken cancellationToken = default)
{
// Queue all mail IDs for the folder
await QueueMailIdsForFolderAsync(folder, cancellationToken).ConfigureAwait(false);
}
protected override async Task<MailCopy> CreateMinimalMailCopyAsync(Message message, MailItemFolder assignedFolder, CancellationToken cancellationToken = default)
{
// Use centralized method
@@ -774,99 +656,6 @@ public class OutlookSynchronizer : WinoSynchronizer<RequestInformation, Message,
}
}
private async Task ProcessDeltaChangesAndDownloadMailsAsync(MailItemFolder folder, List<string> downloadedMessageIds, CancellationToken cancellationToken = default)
{
// Process delta changes and download new mails with metadata only (no MIME)
if (string.IsNullOrEmpty(folder.DeltaToken))
{
_logger.Debug("No delta token available for folder {FolderName}. Skipping delta sync.", folder.FolderName);
return;
}
try
{
var currentDeltaToken = folder.DeltaToken;
_logger.Debug("Processing delta changes for folder {FolderName} with token {DeltaToken}", folder.FolderName, currentDeltaToken.Substring(0, Math.Min(10, currentDeltaToken.Length)) + "...");
_logger.Debug("Delta sync will include all message properties to detect updates (IsRead, Flag, etc.)");
// Always use Delta endpoint with proper configuration
var requestInformation = _graphClient.Me.MailFolders[folder.RemoteFolderId].Messages.Delta.ToGetRequestInformation((config) =>
{
config.QueryParameters.Select = outlookMessageSelectParameters; // Include all necessary fields for detecting updates
config.QueryParameters.Orderby = ["receivedDateTime desc"]; // Sort by received date desc
});
requestInformation.UrlTemplate = requestInformation.UrlTemplate.Insert(requestInformation.UrlTemplate.Length - 1, ",%24deltatoken");
requestInformation.QueryParameters.Add("%24deltatoken", currentDeltaToken);
var messageCollectionPage = await _graphClient.RequestAdapter.SendAsync(requestInformation,
DeltaGetResponse.CreateFromDiscriminatorValue,
cancellationToken: cancellationToken);
// Use PageIterator to process delta changes (both new messages and updates)
var messageIterator = PageIterator<Message, DeltaGetResponse>
.CreatePageIterator(_graphClient, messageCollectionPage, async (message) =>
{
try
{
await HandleItemRetrievedAsync(message, folder, downloadedMessageIds, cancellationToken);
return true;
}
catch (Exception ex)
{
_logger.Error(ex, "Failed to handle delta item {MessageId} for folder {FolderName}", message.Id, folder.FolderName);
return true; // Continue processing other items
}
});
await messageIterator.IterateAsync(cancellationToken).ConfigureAwait(false);
// Update delta token for next sync - always store when there are no nextPageToken remaining
if (!string.IsNullOrEmpty(messageIterator.Deltalink))
{
var deltaToken = GetDeltaTokenFromDeltaLink(messageIterator.Deltalink);
await _outlookChangeProcessor.UpdateFolderDeltaSynchronizationIdentifierAsync(folder.Id, deltaToken).ConfigureAwait(false);
folder.DeltaToken = deltaToken; // Update in-memory object too
_logger.Debug("Updated delta token for folder {FolderName} after processing delta changes", folder.FolderName);
}
}
catch (ApiException apiException)
{
// Try to handle the error using the error handling factory
var errorContext = new SynchronizerErrorContext
{
Account = Account,
ErrorCode = (int?)apiException.ResponseStatusCode,
ErrorMessage = $"API error during delta sync: {apiException.Message}",
Exception = apiException
};
var handled = await _errorHandlingFactory.HandleErrorAsync(errorContext).ConfigureAwait(false);
if (handled)
{
// The error handler has processed the error (e.g., DeltaTokenExpiredHandler for 410)
// Update in-memory folder state if it was a delta token expiration
if (apiException.ResponseStatusCode == 410)
{
folder.DeltaToken = string.Empty;
folder.FolderStatus = InitialSynchronizationStatus.None;
_logger.Information("API error handled successfully for folder {FolderName} during delta sync. Error: {ErrorCode}", folder.FolderName, apiException.ResponseStatusCode);
}
}
else
{
// No handler could process this error, log and re-throw
_logger.Error(apiException, "Unhandled API error during delta sync for folder {FolderName}. Error: {ErrorCode}", folder.FolderName, apiException.ResponseStatusCode);
}
}
catch (Exception ex)
{
_logger.Error(ex, "Error processing delta changes for folder {FolderName}", folder.FolderName);
}
}
private async Task ProcessDeltaChangesAsync(MailItemFolder folder, List<string> downloadedMessageIds, CancellationToken cancellationToken = default)
{
// Only process delta changes if we have a delta token (not initial sync)