Outlook sync improvements.
This commit is contained in:
@@ -14,6 +14,7 @@ using System.Text.RegularExpressions;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Graph;
|
||||
using Microsoft.Graph.Me.MailFolders.Item.Messages.Delta;
|
||||
using Microsoft.Graph.Models;
|
||||
using Microsoft.Graph.Models.ODataErrors;
|
||||
using Microsoft.Kiota.Abstractions;
|
||||
@@ -52,7 +53,7 @@ public partial class OutlookSynchronizerJsonContext : JsonSerializerContext;
|
||||
public class OutlookSynchronizer : WinoSynchronizer<RequestInformation, Message, Event>
|
||||
{
|
||||
public override uint BatchModificationSize => 20;
|
||||
public override uint InitialMessageDownloadCountPerFolder => 250;
|
||||
public override uint InitialMessageDownloadCountPerFolder => 1000;
|
||||
private const uint MaximumAllowedBatchRequestSize = 20;
|
||||
|
||||
private const string INBOX_NAME = "inbox";
|
||||
@@ -87,6 +88,8 @@ public class OutlookSynchronizer : WinoSynchronizer<RequestInformation, Message,
|
||||
private readonly GraphServiceClient _graphClient;
|
||||
private readonly IOutlookSynchronizerErrorHandlerFactory _errorHandlingFactory;
|
||||
|
||||
private readonly SemaphoreSlim _concurrentDownloadSemaphore = new(10); // Limit to 10 concurrent downloads
|
||||
|
||||
public OutlookSynchronizer(MailAccount account,
|
||||
IAuthenticator authenticator,
|
||||
IOutlookChangeProcessor outlookChangeProcessor,
|
||||
@@ -229,101 +232,520 @@ public class OutlookSynchronizer : WinoSynchronizer<RequestInformation, Message,
|
||||
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
retry:
|
||||
string latestDeltaLink = string.Empty;
|
||||
_logger.Debug("Synchronizing {FolderName} with direct download approach", folder.FolderName);
|
||||
|
||||
bool isInitialSync = string.IsNullOrEmpty(folder.DeltaToken);
|
||||
|
||||
Microsoft.Graph.Me.MailFolders.Item.Messages.Delta.DeltaGetResponse messageCollectionPage = null;
|
||||
|
||||
_logger.Debug("Synchronizing {FolderName}", folder.FolderName);
|
||||
|
||||
if (isInitialSync)
|
||||
// Check if initial sync is completed for this folder
|
||||
if (!folder.IsInitialSyncCompleted)
|
||||
{
|
||||
_logger.Debug("No sync identifier for Folder {FolderName}. Performing initial sync.", folder.FolderName);
|
||||
_logger.Debug("Initial sync not completed for folder {FolderName}. Starting mail synchronization.", folder.FolderName);
|
||||
|
||||
// No delta link. Performing initial sync.
|
||||
// Download mails for initial sync
|
||||
await DownloadMailsForInitialSyncAsync(folder, downloadedMessageIds, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
messageCollectionPage = await _graphClient.Me.MailFolders[folder.RemoteFolderId].Messages.Delta.GetAsDeltaGetResponseAsync((config) =>
|
||||
{
|
||||
config.QueryParameters.Top = (int)InitialMessageDownloadCountPerFolder;
|
||||
config.QueryParameters.Select = outlookMessageSelectParameters;
|
||||
config.QueryParameters.Orderby = ["receivedDateTime desc"];
|
||||
}, cancellationToken).ConfigureAwait(false);
|
||||
// Mark initial sync as completed
|
||||
await _outlookChangeProcessor.UpdateFolderInitialSyncCompletedAsync(folder.Id, true).ConfigureAwait(false);
|
||||
folder.IsInitialSyncCompleted = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Initial sync is completed, process delta changes and download new mails
|
||||
_logger.Debug("Initial sync completed for folder {FolderName}. Processing delta changes and downloading new mails.", folder.FolderName);
|
||||
|
||||
await ProcessDeltaChangesAndDownloadMailsAsync(folder, downloadedMessageIds, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
await _outlookChangeProcessor.UpdateFolderLastSyncDateAsync(folder.Id).ConfigureAwait(false);
|
||||
|
||||
if (downloadedMessageIds.Any())
|
||||
{
|
||||
_logger.Information("Downloaded {Count} messages for folder {FolderName}", downloadedMessageIds.Count, folder.FolderName);
|
||||
}
|
||||
|
||||
return downloadedMessageIds;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Downloads mails for initial synchronization using Delta API and direct download with concurrency control.
|
||||
/// </summary>
|
||||
private async Task DownloadMailsForInitialSyncAsync(MailItemFolder folder, List<string> downloadedMessageIds, CancellationToken cancellationToken)
|
||||
{
|
||||
_logger.Debug("Starting initial mail download for folder {FolderName}", folder.FolderName);
|
||||
|
||||
var mailIds = new List<string>();
|
||||
|
||||
try
|
||||
{
|
||||
// Always use Delta API for initial sync - this ensures proper delta token setup for future incremental syncs
|
||||
DeltaGetResponse messageCollectionPage = null;
|
||||
|
||||
if (string.IsNullOrEmpty(folder.DeltaToken))
|
||||
{
|
||||
messageCollectionPage = await _graphClient.Me.MailFolders[folder.RemoteFolderId].Messages.Delta.GetAsDeltaGetResponseAsync((config) =>
|
||||
{
|
||||
config.QueryParameters.Select = ["Id"]; // Only get the message Ids
|
||||
config.QueryParameters.Orderby = ["receivedDateTime desc"]; // Sort by received date desc
|
||||
config.QueryParameters.Top = (int)InitialMessageDownloadCountPerFolder;
|
||||
}, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
var requestInformation = _graphClient.Me.MailFolders[folder.RemoteFolderId].Messages.Delta.ToGetRequestInformation((config) =>
|
||||
{
|
||||
config.QueryParameters.Select = ["Id"]; // Only get the message Ids
|
||||
config.QueryParameters.Orderby = ["receivedDateTime desc"]; // Sort by received date desc
|
||||
});
|
||||
|
||||
requestInformation.UrlTemplate = requestInformation.UrlTemplate.Insert(requestInformation.UrlTemplate.Length - 1, ",%24deltatoken");
|
||||
requestInformation.QueryParameters.Add("%24deltatoken", folder.DeltaToken);
|
||||
|
||||
messageCollectionPage = await _graphClient.RequestAdapter.SendAsync(requestInformation, DeltaGetResponse.CreateFromDiscriminatorValue, cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
// Use PageIterator<DeltaGetResponse> for iterating through the messages
|
||||
var messageIterator = PageIterator<Message, DeltaGetResponse>.CreatePageIterator(_graphClient, messageCollectionPage, (message) =>
|
||||
{
|
||||
if (!IsResourceDeleted(message.AdditionalData))
|
||||
{
|
||||
mailIds.Add(message.Id);
|
||||
}
|
||||
|
||||
// Iterator must continue all the time to recieve delta token at the end.
|
||||
return true;
|
||||
});
|
||||
|
||||
await messageIterator.IterateAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Extract delta token from the iterator's delta link
|
||||
string deltaToken = null;
|
||||
if (!string.IsNullOrEmpty(messageIterator.Deltalink))
|
||||
{
|
||||
deltaToken = GetDeltaTokenFromDeltaLink(messageIterator.Deltalink);
|
||||
}
|
||||
|
||||
// Download mails concurrently with semaphore control
|
||||
if (mailIds.Any())
|
||||
{
|
||||
_logger.Information("Starting concurrent download of {Count} mails for folder {FolderName}", mailIds.Count, folder.FolderName);
|
||||
await DownloadMailsConcurrentlyAsync(mailIds, folder, downloadedMessageIds, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.Information("No mail ids found to download for folder {FolderName}", folder.FolderName);
|
||||
}
|
||||
|
||||
// Store the delta token for future incremental syncs - always store when available
|
||||
if (!string.IsNullOrEmpty(deltaToken))
|
||||
{
|
||||
await _outlookChangeProcessor.UpdateFolderDeltaSynchronizationIdentifierAsync(folder.Id, deltaToken).ConfigureAwait(false);
|
||||
await _outlookChangeProcessor.UpdateFolderLastSyncDateAsync(folder.Id).ConfigureAwait(false);
|
||||
folder.DeltaToken = deltaToken;
|
||||
_logger.Information("Stored delta token for folder {FolderName} - future syncs will be incremental", folder.FolderName);
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.Warning("No delta token received for folder {FolderName} - future syncs may re-download messages", folder.FolderName);
|
||||
}
|
||||
}
|
||||
catch (ApiException apiException)
|
||||
{
|
||||
// Try to handle the error using the error handling factory
|
||||
var errorContext = new SynchronizerErrorContext
|
||||
{
|
||||
Account = Account,
|
||||
ErrorCode = (int?)apiException.ResponseStatusCode,
|
||||
ErrorMessage = $"API error during initial sync: {apiException.Message}",
|
||||
Exception = apiException
|
||||
};
|
||||
|
||||
var handled = await _errorHandlingFactory.HandleErrorAsync(errorContext).ConfigureAwait(false);
|
||||
|
||||
if (handled)
|
||||
{
|
||||
// The error handler has processed the error (e.g., DeltaTokenExpiredHandler for 410)
|
||||
// Update in-memory folder state if it was a delta token expiration
|
||||
if (apiException.ResponseStatusCode == 410)
|
||||
{
|
||||
folder.DeltaToken = string.Empty;
|
||||
folder.IsInitialSyncCompleted = false;
|
||||
_logger.Information("API error handled successfully for folder {FolderName} during initial sync. Error: {ErrorCode}", folder.FolderName, apiException.ResponseStatusCode);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// No handler could process this error, log and re-throw
|
||||
_logger.Error(apiException, "Unhandled API error during initial sync for folder {FolderName}. Error: {ErrorCode}", folder.FolderName, apiException.ResponseStatusCode);
|
||||
}
|
||||
|
||||
// Re-throw the exception so the synchronization can be retried
|
||||
throw;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Error(ex, "Error occurred during initial mail download for folder {FolderName}", folder.FolderName);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Downloads mails concurrently with semaphore control to limit concurrent downloads to 10.
|
||||
/// </summary>
|
||||
private async Task DownloadMailsConcurrentlyAsync(List<string> mailIds, MailItemFolder folder, List<string> downloadedMessageIds, CancellationToken cancellationToken)
|
||||
{
|
||||
var downloadTasks = mailIds.Select(async mailId =>
|
||||
{
|
||||
await _concurrentDownloadSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
var downloaded = await DownloadSingleMailAsync(mailId, folder, cancellationToken).ConfigureAwait(false);
|
||||
if (downloaded != null)
|
||||
{
|
||||
lock (downloadedMessageIds)
|
||||
{
|
||||
downloadedMessageIds.Add(downloaded);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_concurrentDownloadSemaphore.Release();
|
||||
}
|
||||
});
|
||||
|
||||
await Task.WhenAll(downloadTasks).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Downloads a single mail by ID and creates it in the database.
|
||||
/// </summary>
|
||||
private async Task<string> DownloadSingleMailAsync(string mailId, MailItemFolder folder, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Check if mail already exists in database before downloading
|
||||
// to avoid unnecessary API calls and reprocessing existing mails
|
||||
bool mailExists = await _outlookChangeProcessor.IsMailExistsInFolderAsync(mailId, folder.Id).ConfigureAwait(false);
|
||||
if (mailExists)
|
||||
{
|
||||
_logger.Debug("Mail {MailId} already exists in folder {FolderName}, skipping download", mailId, folder.FolderName);
|
||||
return null; // Not a new download
|
||||
}
|
||||
|
||||
// Download the message with minimal properties
|
||||
var message = await GetMessageByIdAsync(mailId, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (message != null)
|
||||
{
|
||||
// Create minimal MailCopy without downloading MIME
|
||||
var mailCopy = await CreateMinimalMailCopyAsync(message, folder, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (mailCopy != null)
|
||||
{
|
||||
// Create a minimal package without MIME for direct sync
|
||||
var package = new NewMailItemPackage(mailCopy, null, folder.RemoteFolderId);
|
||||
bool isInserted = await _outlookChangeProcessor.CreateMailAsync(Account.Id, package).ConfigureAwait(false);
|
||||
|
||||
if (isInserted)
|
||||
{
|
||||
return mailCopy.Id; // Successfully created
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.Warning("Failed to insert mail {MailId} for folder {FolderName}", mailId, folder.FolderName);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.Debug("Could not create MailCopy for {MailId} in folder {FolderName} (might be unsupported message type)", mailId, folder.FolderName);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.Debug("Message {MailId} is null for folder {FolderName} (filtered out)", mailId, folder.FolderName);
|
||||
}
|
||||
}
|
||||
catch (ServiceException serviceException)
|
||||
{
|
||||
// Try to handle the error using the error handling factory first
|
||||
var errorContext = new SynchronizerErrorContext
|
||||
{
|
||||
Account = Account,
|
||||
ErrorCode = (int?)serviceException.ResponseStatusCode,
|
||||
ErrorMessage = $"Service error during mail download: {serviceException.Message}",
|
||||
Exception = serviceException
|
||||
};
|
||||
|
||||
var handled = await _errorHandlingFactory.HandleErrorAsync(errorContext).ConfigureAwait(false);
|
||||
|
||||
if (!handled)
|
||||
{
|
||||
// No handler could process this error, log appropriately
|
||||
if (serviceException.ResponseStatusCode == 404)
|
||||
{
|
||||
_logger.Warning("Mail {MailId} not found on server (404) for folder {FolderName}", mailId, folder.FolderName);
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.Error(serviceException, "Unhandled service error while downloading mail {MailId} for folder {FolderName}. Error: {ErrorCode}", mailId, folder.FolderName, serviceException.ResponseStatusCode);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.Information("Service error handled successfully during mail download. Mail: {MailId}, Folder: {FolderName}, Error: {ErrorCode}", mailId, folder.FolderName, serviceException.ResponseStatusCode);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Error(ex, "Error occurred while downloading mail {MailId} for folder {FolderName}", mailId, folder.FolderName);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private string GetDeltaTokenFromDeltaLink(string deltaLink)
|
||||
=> Regex.Split(deltaLink, "deltatoken=")[1];
|
||||
|
||||
protected override async Task QueueMailIdsForInitialSyncAsync(MailItemFolder folder, CancellationToken cancellationToken = default)
|
||||
{
|
||||
// This method is now replaced by direct downloading logic
|
||||
// Instead of queuing mail IDs, we now directly download them with concurrency control
|
||||
var downloadedMessageIds = new List<string>();
|
||||
await DownloadMailsForInitialSyncAsync(folder, downloadedMessageIds, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
protected override Task<MailCopy> CreateMinimalMailCopyAsync(Message message, MailItemFolder assignedFolder, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (message == null) return Task.FromResult<MailCopy>(null);
|
||||
|
||||
// Create MailCopy with minimal properties - no MIME download
|
||||
var mailCopy = message.AsMailCopy();
|
||||
mailCopy.FolderId = assignedFolder.Id;
|
||||
mailCopy.UniqueId = Guid.NewGuid();
|
||||
mailCopy.FileId = Guid.NewGuid();
|
||||
|
||||
return Task.FromResult(mailCopy);
|
||||
}
|
||||
|
||||
private async Task<Message> GetMessageByIdAsync(string messageId, CancellationToken cancellationToken = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
return await _graphClient.Me.Messages[messageId].GetAsync((config) =>
|
||||
{
|
||||
config.QueryParameters.Select = outlookMessageSelectParameters;
|
||||
}, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (ServiceException serviceException)
|
||||
{
|
||||
// Try to handle the error using the error handling factory first
|
||||
var errorContext = new SynchronizerErrorContext
|
||||
{
|
||||
Account = Account,
|
||||
ErrorCode = (int?)serviceException.ResponseStatusCode,
|
||||
ErrorMessage = $"Service error during message retrieval: {serviceException.Message}",
|
||||
Exception = serviceException
|
||||
};
|
||||
|
||||
var handled = await _errorHandlingFactory.HandleErrorAsync(errorContext).ConfigureAwait(false);
|
||||
|
||||
if (!handled)
|
||||
{
|
||||
// No handler could process this error, log and handle appropriately
|
||||
if (serviceException.ResponseStatusCode == 404)
|
||||
{
|
||||
// Re-throw 404 errors to be handled by the caller for queue cleanup
|
||||
throw;
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.Error(serviceException, "Unhandled service error while getting message {MessageId}. Error: {ErrorCode}", messageId, serviceException.ResponseStatusCode);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.Information("Service error handled successfully during message retrieval. Message: {MessageId}, Error: {ErrorCode}", messageId, serviceException.ResponseStatusCode);
|
||||
return null; // Return null since the error was handled but we couldn't get the message
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Error(ex, "Failed to get message {MessageId}", messageId);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessDeltaChangesAndDownloadMailsAsync(MailItemFolder folder, List<string> downloadedMessageIds, CancellationToken cancellationToken = default)
|
||||
{
|
||||
// Process delta changes and directly download new mails
|
||||
if (string.IsNullOrEmpty(folder.DeltaToken))
|
||||
{
|
||||
_logger.Debug("No delta token available for folder {FolderName}. Skipping delta sync.", folder.FolderName);
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var currentDeltaToken = folder.DeltaToken;
|
||||
|
||||
_logger.Debug("Processing delta changes for folder {FolderName} with token {DeltaToken}", folder.FolderName, currentDeltaToken.Substring(0, Math.Min(10, currentDeltaToken.Length)) + "...");
|
||||
|
||||
// Always use Delta endpoint with proper configuration
|
||||
var requestInformation = _graphClient.Me.MailFolders[folder.RemoteFolderId].Messages.Delta.ToGetRequestInformation((config) =>
|
||||
{
|
||||
config.QueryParameters.Top = (int)InitialMessageDownloadCountPerFolder;
|
||||
config.QueryParameters.Select = outlookMessageSelectParameters;
|
||||
config.QueryParameters.Orderby = ["receivedDateTime desc"];
|
||||
config.QueryParameters.Select = ["Id"]; // Only get IDs for direct download
|
||||
config.QueryParameters.Orderby = ["receivedDateTime desc"]; // Sort by received date desc
|
||||
});
|
||||
|
||||
requestInformation.UrlTemplate = requestInformation.UrlTemplate.Insert(requestInformation.UrlTemplate.Length - 1, ",%24deltatoken");
|
||||
requestInformation.QueryParameters.Add("%24deltatoken", currentDeltaToken);
|
||||
|
||||
try
|
||||
{
|
||||
messageCollectionPage = await _graphClient.RequestAdapter.SendAsync(requestInformation, Microsoft.Graph.Me.MailFolders.Item.Messages.Delta.DeltaGetResponse.CreateFromDiscriminatorValue, cancellationToken: cancellationToken);
|
||||
}
|
||||
catch (ApiException apiException) when (apiException.ResponseStatusCode == 410)
|
||||
{
|
||||
folder.DeltaToken = string.Empty;
|
||||
var messageCollectionPage = await _graphClient.RequestAdapter.SendAsync(requestInformation,
|
||||
DeltaGetResponse.CreateFromDiscriminatorValue,
|
||||
cancellationToken: cancellationToken);
|
||||
|
||||
goto retry;
|
||||
var newMailIds = new List<string>();
|
||||
|
||||
// Use PageIterator<DeltaGetResponse> for iterating through delta changes
|
||||
var messageIterator = PageIterator<Message, DeltaGetResponse>
|
||||
.CreatePageIterator(_graphClient, messageCollectionPage, (message) =>
|
||||
{
|
||||
// Only process new messages, not deleted ones
|
||||
if (!IsResourceDeleted(message.AdditionalData))
|
||||
{
|
||||
newMailIds.Add(message.Id);
|
||||
}
|
||||
return true;
|
||||
});
|
||||
|
||||
await messageIterator.IterateAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Download new mails directly with concurrency control
|
||||
if (newMailIds.Any())
|
||||
{
|
||||
_logger.Information("Starting direct download of {Count} new mails from delta sync for folder {FolderName}", newMailIds.Count, folder.FolderName);
|
||||
await DownloadMailsConcurrentlyAsync(newMailIds, folder, downloadedMessageIds, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
// Update delta token for next sync - always store when there are no nextPageToken remaining
|
||||
if (!string.IsNullOrEmpty(messageIterator.Deltalink))
|
||||
{
|
||||
var deltaToken = GetDeltaTokenFromDeltaLink(messageIterator.Deltalink);
|
||||
await _outlookChangeProcessor.UpdateFolderDeltaSynchronizationIdentifierAsync(folder.Id, deltaToken).ConfigureAwait(false);
|
||||
folder.DeltaToken = deltaToken; // Update in-memory object too
|
||||
_logger.Debug("Updated delta token for folder {FolderName} after processing delta changes", folder.FolderName);
|
||||
}
|
||||
}
|
||||
|
||||
var messageIteratorAsync = PageIterator<Message, Microsoft.Graph.Me.MailFolders.Item.Messages.Delta.DeltaGetResponse>.CreatePageIterator(_graphClient, messageCollectionPage, async (item) =>
|
||||
catch (ApiException apiException)
|
||||
{
|
||||
try
|
||||
// Try to handle the error using the error handling factory
|
||||
var errorContext = new SynchronizerErrorContext
|
||||
{
|
||||
await _handleItemRetrievalSemaphore.WaitAsync();
|
||||
return await HandleItemRetrievedAsync(item, folder, downloadedMessageIds, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
Account = Account,
|
||||
ErrorCode = (int?)apiException.ResponseStatusCode,
|
||||
ErrorMessage = $"API error during delta sync: {apiException.Message}",
|
||||
Exception = apiException
|
||||
};
|
||||
|
||||
var handled = await _errorHandlingFactory.HandleErrorAsync(errorContext).ConfigureAwait(false);
|
||||
|
||||
if (handled)
|
||||
{
|
||||
_logger.Error(ex, "Error occurred while handling item {Id} for folder {FolderName}", item.Id, folder.FolderName);
|
||||
// The error handler has processed the error (e.g., DeltaTokenExpiredHandler for 410)
|
||||
// Update in-memory folder state if it was a delta token expiration
|
||||
if (apiException.ResponseStatusCode == 410)
|
||||
{
|
||||
folder.DeltaToken = string.Empty;
|
||||
folder.IsInitialSyncCompleted = false;
|
||||
_logger.Information("API error handled successfully for folder {FolderName} during delta sync. Error: {ErrorCode}", folder.FolderName, apiException.ResponseStatusCode);
|
||||
}
|
||||
}
|
||||
finally
|
||||
else
|
||||
{
|
||||
_handleItemRetrievalSemaphore.Release();
|
||||
// No handler could process this error, log and re-throw
|
||||
_logger.Error(apiException, "Unhandled API error during delta sync for folder {FolderName}. Error: {ErrorCode}", folder.FolderName, apiException.ResponseStatusCode);
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
await messageIteratorAsync
|
||||
.IterateAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
latestDeltaLink = messageIteratorAsync.Deltalink;
|
||||
|
||||
if (downloadedMessageIds.Any())
|
||||
{
|
||||
_logger.Debug("Downloaded {Count} messages for folder {FolderName}", downloadedMessageIds.Count, folder.FolderName);
|
||||
}
|
||||
|
||||
//Store delta link for tracking new changes.
|
||||
if (!string.IsNullOrEmpty(latestDeltaLink))
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Parse Delta Token from Delta Link since v5 of Graph SDK works based on the token, not the link.
|
||||
|
||||
var deltaToken = GetDeltaTokenFromDeltaLink(latestDeltaLink);
|
||||
|
||||
await _outlookChangeProcessor.UpdateFolderDeltaSynchronizationIdentifierAsync(folder.Id, deltaToken).ConfigureAwait(false);
|
||||
_logger.Error(ex, "Error processing delta changes for folder {FolderName}", folder.FolderName);
|
||||
}
|
||||
|
||||
await _outlookChangeProcessor.UpdateFolderLastSyncDateAsync(folder.Id).ConfigureAwait(false);
|
||||
|
||||
return downloadedMessageIds;
|
||||
}
|
||||
|
||||
private string GetDeltaTokenFromDeltaLink(string deltaLink)
|
||||
=> Regex.Split(deltaLink, "deltatoken=")[1];
|
||||
private async Task ProcessDeltaChangesAsync(MailItemFolder folder, List<string> downloadedMessageIds, CancellationToken cancellationToken = default)
|
||||
{
|
||||
// Only process delta changes if we have a delta token (not initial sync)
|
||||
if (string.IsNullOrEmpty(folder.DeltaToken))
|
||||
return;
|
||||
|
||||
try
|
||||
{
|
||||
var currentDeltaToken = folder.DeltaToken;
|
||||
|
||||
// Always use Delta endpoint with proper configuration
|
||||
var requestInformation = _graphClient.Me.MailFolders[folder.RemoteFolderId].Messages.Delta.ToGetRequestInformation((config) =>
|
||||
{
|
||||
config.QueryParameters.Select = outlookMessageSelectParameters;
|
||||
config.QueryParameters.Orderby = ["receivedDateTime desc"]; // Sort by received date desc
|
||||
});
|
||||
|
||||
requestInformation.UrlTemplate = requestInformation.UrlTemplate.Insert(requestInformation.UrlTemplate.Length - 1, ",%24deltatoken");
|
||||
requestInformation.QueryParameters.Add("%24deltatoken", currentDeltaToken);
|
||||
|
||||
var messageCollectionPage = await _graphClient.RequestAdapter.SendAsync(requestInformation,
|
||||
DeltaGetResponse.CreateFromDiscriminatorValue,
|
||||
cancellationToken: cancellationToken);
|
||||
|
||||
// Use PageIterator<DeltaGetResponse> for iterating mails
|
||||
var messageIterator = PageIterator<Message, DeltaGetResponse>
|
||||
.CreatePageIterator(_graphClient, messageCollectionPage, async (item) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await _handleItemRetrievalSemaphore.WaitAsync();
|
||||
return await HandleItemRetrievedAsync(item, folder, downloadedMessageIds, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Error(ex, "Error occurred while handling delta item {Id} for folder {FolderName}", item.Id, folder.FolderName);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_handleItemRetrievalSemaphore.Release();
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
await messageIterator.IterateAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Update delta token for next sync - store delta token when there are no nextPageToken remaining
|
||||
if (!string.IsNullOrEmpty(messageIterator.Deltalink))
|
||||
{
|
||||
var deltaToken = GetDeltaTokenFromDeltaLink(messageIterator.Deltalink);
|
||||
await _outlookChangeProcessor.UpdateFolderDeltaSynchronizationIdentifierAsync(folder.Id, deltaToken).ConfigureAwait(false);
|
||||
_logger.Debug("Updated delta token for folder {FolderName} after processing delta changes", folder.FolderName);
|
||||
}
|
||||
}
|
||||
catch (ApiException apiException)
|
||||
{
|
||||
// Try to handle the error using the error handling factory
|
||||
var errorContext = new SynchronizerErrorContext
|
||||
{
|
||||
Account = Account,
|
||||
ErrorCode = (int?)apiException.ResponseStatusCode,
|
||||
ErrorMessage = $"API error during legacy delta sync: {apiException.Message}",
|
||||
Exception = apiException
|
||||
};
|
||||
|
||||
var handled = await _errorHandlingFactory.HandleErrorAsync(errorContext).ConfigureAwait(false);
|
||||
|
||||
if (!handled)
|
||||
{
|
||||
// No handler could process this error, log and re-throw
|
||||
_logger.Error(apiException, "Unhandled API error during legacy delta sync for folder {FolderName}. Error: {ErrorCode}", folder.FolderName, apiException.ResponseStatusCode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private bool IsResourceDeleted(IDictionary<string, object> additionalData)
|
||||
=> additionalData != null && additionalData.ContainsKey("@removed");
|
||||
@@ -551,10 +973,44 @@ public class OutlookSynchronizer : WinoSynchronizer<RequestInformation, Message,
|
||||
Microsoft.Graph.Me.MailFolders.Delta.DeltaGetResponse.CreateFromDiscriminatorValue,
|
||||
cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (ApiException apiException) when (apiException.ResponseStatusCode == 410)
|
||||
catch (ApiException apiException)
|
||||
{
|
||||
Account.SynchronizationDeltaIdentifier = string.Empty;
|
||||
return await GetDeltaFoldersAsync(cancellationToken);
|
||||
// Try to handle the error using the error handling factory
|
||||
var errorContext = new SynchronizerErrorContext
|
||||
{
|
||||
Account = Account,
|
||||
ErrorCode = (int?)apiException.ResponseStatusCode,
|
||||
ErrorMessage = $"API error during folder synchronization: {apiException.Message}",
|
||||
Exception = apiException
|
||||
};
|
||||
|
||||
var handled = await _errorHandlingFactory.HandleErrorAsync(errorContext).ConfigureAwait(false);
|
||||
|
||||
if (handled)
|
||||
{
|
||||
// The error handler has processed the error (e.g., DeltaTokenExpiredHandler for 410)
|
||||
// Update in-memory account state if it was a delta token expiration
|
||||
if (apiException.ResponseStatusCode == 410)
|
||||
{
|
||||
Account.SynchronizationDeltaIdentifier = string.Empty;
|
||||
_logger.Information("API error handled successfully for account {AccountName} during folder sync. Error: {ErrorCode}", Account.Name, apiException.ResponseStatusCode);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// No handler could process this error, log and re-throw
|
||||
_logger.Error(apiException, "Unhandled API error during folder synchronization for account {AccountName}. Error: {ErrorCode}", Account.Name, apiException.ResponseStatusCode);
|
||||
throw;
|
||||
}
|
||||
|
||||
// If a handler processed the error and it was 410, retry with fresh token
|
||||
if (apiException.ResponseStatusCode == 410)
|
||||
{
|
||||
return await GetDeltaFoldersAsync(cancellationToken);
|
||||
}
|
||||
|
||||
// For other handled errors, we still need to throw since we can't return a meaningful response
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user