Fixed the Gmail sync identifier update issue and removed the batch message download.

This commit is contained in:
Burak Kaan Köse
2025-03-19 23:22:57 +01:00
parent ac64c35efa
commit 13b495b0f6
5 changed files with 90 additions and 129 deletions

View File

@@ -24,7 +24,7 @@ namespace Wino.Core.Integration.Processors;
public interface IDefaultChangeProcessor
{
Task UpdateAccountAsync(MailAccount account);
Task<string> UpdateAccountDeltaSynchronizationIdentifierAsync(Guid accountId, string deltaSynchronizationIdentifier);
// Task<string> UpdateAccountDeltaSynchronizationIdentifierAsync(Guid accountId, string deltaSynchronizationIdentifier);
Task DeleteAssignmentAsync(Guid accountId, string mailCopyId, string remoteFolderId);
Task ChangeMailReadStatusAsync(string mailCopyId, bool isRead);
Task ChangeFlagStatusAsync(string mailCopyId, bool isFlagged);
@@ -64,6 +64,7 @@ public interface IDefaultChangeProcessor
/// <returns>Whether mail exists in the folder or not.</returns>
Task<bool> IsMailExistsInFolderAsync(string messageId, Guid folderId);
Task<List<string>> AreMailsExistsAsync(IEnumerable<string> mailCopyIds);
Task<string> UpdateAccountDeltaSynchronizationIdentifierAsync(Guid accountId, string synchronizationDeltaIdentifier);
}
public interface IGmailChangeProcessor : IDefaultChangeProcessor
@@ -122,7 +123,7 @@ public class DefaultChangeProcessor(IDatabaseService databaseService,
private readonly IMimeFileService _mimeFileService = mimeFileService;
public Task<string> UpdateAccountDeltaSynchronizationIdentifierAsync(Guid accountId, string synchronizationDeltaIdentifier)
=> AccountService.UpdateSynchronizationIdentifierAsync(accountId, synchronizationDeltaIdentifier);
=> AccountService.UpdateSyncIdentifierRawAsync(accountId, synchronizationDeltaIdentifier);
public Task ChangeFlagStatusAsync(string mailCopyId, bool isFlagged)
=> MailService.ChangeFlagStatusAsync(mailCopyId, isFlagged);

View File

@@ -22,7 +22,7 @@ public class OutlookChangeProcessor(IDatabaseService databaseService,
{
public Task<string> ResetAccountDeltaTokenAsync(Guid accountId)
=> AccountService.UpdateSynchronizationIdentifierAsync(accountId, null);
=> AccountService.UpdateSyncIdentifierRawAsync(accountId, string.Empty);
public async Task<string> ResetFolderDeltaTokenAsync(Guid folderId)
{

View File

@@ -274,7 +274,10 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
}
// Start downloading missing messages.
await BatchDownloadMessagesAsync(missingMessageIds, cancellationToken).ConfigureAwait(false);
foreach (var messageId in missingMessageIds)
{
await DownloadSingleMessageAsync(messageId, cancellationToken).ConfigureAwait(false);
}
// Map archive assignments if there are any changes reported.
if (listChanges.Any() || deltaChanges.Any())
@@ -292,14 +295,20 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
}
// Take the max history id from delta changes and update the account sync modifier.
var maxHistoryId = deltaChanges.Max(a => a.HistoryId);
if (maxHistoryId != null)
if (deltaChanges.Any())
{
// TODO: This is not good. Centralize the identifier fetch and prevent direct access here.
Account.SynchronizationDeltaIdentifier = await _gmailChangeProcessor.UpdateAccountDeltaSynchronizationIdentifierAsync(Account.Id, maxHistoryId.ToString()).ConfigureAwait(false);
var maxHistoryId = deltaChanges.Where(a => a.HistoryId != null).Max(a => a.HistoryId);
_logger.Debug("Final sync identifier {SynchronizationDeltaIdentifier}", Account.SynchronizationDeltaIdentifier);
await UpdateAccountSyncIdentifierAsync(maxHistoryId);
if (maxHistoryId != null)
{
// TODO: This is not good. Centralize the identifier fetch and prevent direct access here.
// Account.SynchronizationDeltaIdentifier = await _gmailChangeProcessor.UpdateAccountDeltaSynchronizationIdentifierAsync(Account.Id, maxHistoryId.ToString()).ConfigureAwait(false);
_logger.Debug("Final sync identifier {SynchronizationDeltaIdentifier}", Account.SynchronizationDeltaIdentifier);
}
}
// Get all unred new downloaded items and return in the result.
@@ -310,6 +319,26 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
return MailSynchronizationResult.Completed(unreadNewItems);
}
private async Task DownloadSingleMessageAsync(string messageId, CancellationToken cancellationToken = default)
{
// Google .NET SDK has memory issues with batch downloading messages which will not be fixed since the library is in maintenance mode.
// https://github.com/googleapis/google-api-dotnet-client/issues/2603
// This method will be used to download messages one by one to prevent memory spikes.
try
{
var singleRequest = CreateSingleMessageGet(messageId);
var downloadedMessage = await singleRequest.ExecuteAsync(cancellationToken).ConfigureAwait(false);
await HandleSingleItemDownloadedCallbackAsync(downloadedMessage, null, messageId, cancellationToken).ConfigureAwait(false);
await UpdateAccountSyncIdentifierAsync(downloadedMessage.HistoryId).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.Error(ex, "Error while downloading message {MessageId} for {Name}", messageId, Account.Name);
}
}
protected override async Task<CalendarSynchronizationResult> SynchronizeCalendarEventsInternalAsync(CalendarSynchronizationOptions options, CancellationToken cancellationToken = default)
{
_logger.Information("Internal calendar synchronization started for {Name}", Account.Name);
@@ -640,87 +669,6 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
return singleRequest;
}
/// <summary>
/// Downloads given message ids per batch and processes them.
/// </summary>
/// <param name="messageIds">Gmail message ids to download.</param>
/// <param name="cancellationToken">Cancellation token.</param>
private async Task BatchDownloadMessagesAsync(List<string> messageIds, CancellationToken cancellationToken = default)
{
var totalDownloadCount = messageIds.Count();
if (totalDownloadCount == 0) return;
var downloadedItemCount = 0;
_logger.Debug("Batch downloading {Count} messages for {Name}", messageIds.Count(), Account.Name);
var allDownloadRequests = messageIds.Select(CreateSingleMessageGet);
// Respect the batch size limit for batch requests.
var batchedDownloadRequests = allDownloadRequests.Batch((int)MaximumAllowedBatchRequestSize);
_logger.Information("Total items to download: {TotalDownloadCount}. Created {Count} batch download requests for {Name}.", batchedDownloadRequests.Count(), Account.Name, totalDownloadCount);
// Gmail SDK's BatchRequest has Action delegate for callback, not Task.
// Therefore it's not possible to make sure that downloaded item is processed in the database before this
// async callback is finished. Therefore we need to wrap all local database processings into task list and wait all of them to finish
// Batch execution finishes after response parsing is done.
var batchProcessCallbacks = new List<Task<Message>>();
foreach (var batchBundle in batchedDownloadRequests)
{
cancellationToken.ThrowIfCancellationRequested();
var batchRequest = new BatchRequest(_gmailService);
// Queue each request into this batch.
batchBundle.ForEach(request =>
{
batchRequest.Queue<Message>(request, (content, error, index, message) =>
{
var downloadingMessageId = messageIds.ElementAt(index);
var downloadTask = HandleSingleItemDownloadedCallbackAsync(content, error, downloadingMessageId, cancellationToken);
batchProcessCallbacks.Add(downloadTask);
downloadedItemCount++;
var progressValue = downloadedItemCount * 100 / Math.Max(1, totalDownloadCount);
PublishSynchronizationProgress(progressValue);
});
});
_logger.Information("Executing batch download with {Count} items.", batchRequest.Count);
await batchRequest.ExecuteAsync(cancellationToken);
// This is important due to bug in Gmail SDK.
// We force GC here to prevent Out of Memory exception.
// https://github.com/googleapis/google-api-dotnet-client/issues/2603
GC.Collect();
}
// Wait for all processing to finish.
await Task.WhenAll(batchProcessCallbacks).ConfigureAwait(false);
// Try to update max history id.
var historyIdMessages = batchProcessCallbacks.Select(a => a.Result).Where(a => a?.HistoryId != null);
if (historyIdMessages.Any())
{
var maxHistoryId = historyIdMessages.Max(a => a.HistoryId.Value);
if (maxHistoryId > 0)
{
Account.SynchronizationDeltaIdentifier = await _gmailChangeProcessor.UpdateAccountDeltaSynchronizationIdentifierAsync(Account.Id, maxHistoryId.ToString()).ConfigureAwait(false);
}
}
}
/// <summary>
/// Processes the delta changes for the given history changes.
/// Message downloads are not handled here since it's better to batch them.
@@ -1060,7 +1008,10 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
var downloadRequireMessageIds = messageIds.Except(await _gmailChangeProcessor.AreMailsExistsAsync(messageIds));
// Download missing messages.
await BatchDownloadMessagesAsync(downloadRequireMessageIds.ToList(), cancellationToken);
foreach (var messageId in downloadRequireMessageIds)
{
await DownloadSingleMessageAsync(messageId, cancellationToken).ConfigureAwait(false);
}
// Get results from database and return.
@@ -1225,16 +1176,21 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
return message;
}
private async Task UpdateAccountSyncIdentifierFromMessageAsync(Message message)
private bool ShouldUpdateSyncIdentifier(ulong? historyId)
{
// Try updating the history change identifier if any.
if (message.HistoryId == null) return;
if (historyId == null) return false;
if (ulong.TryParse(Account.SynchronizationDeltaIdentifier, out ulong currentIdentifier) &&
ulong.TryParse(message.HistoryId.Value.ToString(), out ulong messageIdentifier) &&
messageIdentifier > currentIdentifier)
var newHistoryId = historyId.Value;
return Account.SynchronizationDeltaIdentifier == null ||
(ulong.TryParse(Account.SynchronizationDeltaIdentifier, out ulong currentIdentifier) && newHistoryId > currentIdentifier);
}
private async Task UpdateAccountSyncIdentifierAsync(ulong? historyId)
{
if (ShouldUpdateSyncIdentifier(historyId))
{
Account.SynchronizationDeltaIdentifier = await _gmailChangeProcessor.UpdateAccountDeltaSynchronizationIdentifierAsync(Account.Id, message.HistoryId.ToString());
Account.SynchronizationDeltaIdentifier = await _gmailChangeProcessor.UpdateAccountDeltaSynchronizationIdentifierAsync(Account.Id, historyId.Value.ToString());
}
}
@@ -1252,7 +1208,7 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
if (gmailMessage == null) return;
await HandleSingleItemDownloadedCallbackAsync(gmailMessage, error, string.Empty, cancellationToken);
await UpdateAccountSyncIdentifierFromMessageAsync(gmailMessage).ConfigureAwait(false);
await UpdateAccountSyncIdentifierAsync(gmailMessage.HistoryId).ConfigureAwait(false);
}
else if (bundle is HttpRequestBundle<IClientServiceRequest, Label> folderBundle)
{