Implemented cache reset for Gmail history id expiration. (#581)
This commit is contained in:
@@ -6,6 +6,7 @@ using MimeKit;
|
||||
using Wino.Core.Domain.Entities.Calendar;
|
||||
using Wino.Core.Domain.Entities.Mail;
|
||||
using Wino.Core.Domain.Entities.Shared;
|
||||
using Wino.Core.Domain.Enums;
|
||||
using Wino.Core.Domain.Interfaces;
|
||||
using Wino.Core.Domain.Models.MailItem;
|
||||
using Wino.Core.Domain.Models.Synchronization;
|
||||
@@ -61,10 +62,12 @@ public interface IDefaultChangeProcessor
|
||||
Task UpdateCalendarDeltaSynchronizationToken(Guid calendarId, string deltaToken);
|
||||
Task<MailCopy> GetMailCopyAsync(string mailCopyId);
|
||||
Task CreateMailRawAsync(MailAccount account, MailItemFolder mailItemFolder, NewMailItemPackage package);
|
||||
Task DeleteUserMailCacheAsync(Guid accountId);
|
||||
}
|
||||
|
||||
public interface IGmailChangeProcessor : IDefaultChangeProcessor
|
||||
{
|
||||
Task<bool> HasAccountAnyDraftAsync(Guid accountId);
|
||||
Task MapLocalDraftAsync(string mailCopyId, string newDraftId, string newThreadId);
|
||||
Task CreateAssignmentAsync(Guid accountId, string mailCopyId, string remoteFolderId);
|
||||
Task ManageCalendarEventAsync(Event calendarEvent, AccountCalendar assignedCalendar, MailAccount organizerAccount);
|
||||
@@ -214,4 +217,10 @@ public class DefaultChangeProcessor(IDatabaseService databaseService,
|
||||
|
||||
public Task UpdateCalendarDeltaSynchronizationToken(Guid calendarId, string deltaToken)
|
||||
=> CalendarService.UpdateCalendarDeltaSynchronizationToken(calendarId, deltaToken);
|
||||
|
||||
public async Task DeleteUserMailCacheAsync(Guid accountId)
|
||||
{
|
||||
await _mimeFileService.DeleteUserMimeCacheAsync(accountId).ConfigureAwait(false);
|
||||
await AccountService.DeleteAccountMailCacheAsync(accountId, AccountCacheResetReason.ExpiredCache).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -308,4 +308,6 @@ public class GmailChangeProcessor : DefaultChangeProcessor, IGmailChangeProcesso
|
||||
};
|
||||
}
|
||||
|
||||
public Task<bool> HasAccountAnyDraftAsync(Guid accountId)
|
||||
=> MailService.HasAccountAnyDraftAsync(accountId);
|
||||
}
|
||||
|
||||
@@ -43,7 +43,13 @@ namespace Wino.Core.Synchronizers.Mail;
|
||||
public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message, Event>, IHttpClientFactory
|
||||
{
|
||||
public override uint BatchModificationSize => 1000;
|
||||
public override uint InitialMessageDownloadCountPerFolder => 1200;
|
||||
|
||||
/// <summary>
|
||||
/// This is NOT the initial message download count per folder.
|
||||
/// Gmail doesn't have per-folder sync. Therefore this represents to total amount that 1 page query returns until
|
||||
/// there are no pages to get. Max allowed is 500.
|
||||
/// </summary>
|
||||
public override uint InitialMessageDownloadCountPerFolder => 500;
|
||||
|
||||
// It's actually 100. But Gmail SDK has internal bug for Out of Memory exception.
|
||||
// https://github.com/googleapis/google-api-dotnet-client/issues/2603
|
||||
@@ -143,6 +149,7 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
|
||||
|
||||
if (options.Type == MailSynchronizationType.FoldersOnly) return MailSynchronizationResult.Empty;
|
||||
|
||||
retry:
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
bool isInitialSync = string.IsNullOrEmpty(Account.SynchronizationDeltaIdentifier);
|
||||
@@ -204,25 +211,43 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
|
||||
var historyRequest = _gmailService.Users.History.List("me");
|
||||
historyRequest.StartHistoryId = startHistoryId;
|
||||
|
||||
while (!string.IsNullOrEmpty(nextPageToken))
|
||||
try
|
||||
{
|
||||
// If this is the first delta check, start from the last history id.
|
||||
// Otherwise start from the next page token. We set them both to the same value for start.
|
||||
// For each different page we set the page token to the next page token.
|
||||
while (!string.IsNullOrEmpty(nextPageToken))
|
||||
{
|
||||
// If this is the first delta check, start from the last history id.
|
||||
// Otherwise start from the next page token. We set them both to the same value for start.
|
||||
// For each different page we set the page token to the next page token.
|
||||
|
||||
bool isFirstDeltaCheck = nextPageToken == startHistoryId.ToString();
|
||||
bool isFirstDeltaCheck = nextPageToken == startHistoryId.ToString();
|
||||
|
||||
if (!isFirstDeltaCheck)
|
||||
historyRequest.PageToken = nextPageToken;
|
||||
if (!isFirstDeltaCheck)
|
||||
historyRequest.PageToken = nextPageToken;
|
||||
|
||||
var historyResponse = await historyRequest.ExecuteAsync(cancellationToken);
|
||||
var historyResponse = await historyRequest.ExecuteAsync(cancellationToken);
|
||||
|
||||
nextPageToken = historyResponse.NextPageToken;
|
||||
nextPageToken = historyResponse.NextPageToken;
|
||||
|
||||
if (historyResponse.History == null)
|
||||
continue;
|
||||
if (historyResponse.History == null)
|
||||
continue;
|
||||
|
||||
deltaChanges.Add(historyResponse);
|
||||
deltaChanges.Add(historyResponse);
|
||||
}
|
||||
}
|
||||
catch (GoogleApiException ex) when (ex.HttpStatusCode == System.Net.HttpStatusCode.NotFound)
|
||||
{
|
||||
// History ID is too old or expired, need to do a full sync.
|
||||
// Theoratically we need to delete the local cache and start from scratch.
|
||||
|
||||
_logger.Warning("History ID {StartHistoryId} is expired for {Name}. Will remove user's mail cache and do full sync.", startHistoryId, Account.Name);
|
||||
|
||||
await _gmailChangeProcessor.DeleteUserMailCacheAsync(Account.Id).ConfigureAwait(false);
|
||||
|
||||
Account.SynchronizationDeltaIdentifier = string.Empty;
|
||||
|
||||
await _gmailChangeProcessor.UpdateAccountAsync(Account).ConfigureAwait(false);
|
||||
|
||||
goto retry;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -612,7 +637,7 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
|
||||
// async callback is finished. Therefore we need to wrap all local database processings into task list and wait all of them to finish
|
||||
// Batch execution finishes after response parsing is done.
|
||||
|
||||
var batchProcessCallbacks = new List<Task>();
|
||||
var batchProcessCallbacks = new List<Task<Message>>();
|
||||
|
||||
foreach (var batchBundle in batchedDownloadRequests)
|
||||
{
|
||||
@@ -626,8 +651,9 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
|
||||
batchRequest.Queue<Message>(request, (content, error, index, message) =>
|
||||
{
|
||||
var downloadingMessageId = messageIds.ElementAt(index);
|
||||
var downloadTask = HandleSingleItemDownloadedCallbackAsync(content, error, downloadingMessageId, cancellationToken);
|
||||
|
||||
batchProcessCallbacks.Add(HandleSingleItemDownloadedCallbackAsync(content, error, downloadingMessageId, cancellationToken));
|
||||
batchProcessCallbacks.Add(downloadTask);
|
||||
|
||||
downloadedItemCount++;
|
||||
|
||||
@@ -650,6 +676,15 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
|
||||
|
||||
// Wait for all processing to finish.
|
||||
await Task.WhenAll(batchProcessCallbacks).ConfigureAwait(false);
|
||||
|
||||
// Try to update max history id.
|
||||
var maxHistoryId = batchProcessCallbacks.Select(a => a.Result).Where(a => a.HistoryId != null).Max(a => a.HistoryId.Value);
|
||||
|
||||
if (maxHistoryId != 0)
|
||||
{
|
||||
Account.SynchronizationDeltaIdentifier = await _gmailChangeProcessor.UpdateAccountDeltaSynchronizationIdentifierAsync(Account.Id, maxHistoryId.ToString()).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -1100,7 +1135,7 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
|
||||
/// <param name="error"></param>
|
||||
/// <param name="httpResponseMessage"></param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
private async Task HandleSingleItemDownloadedCallbackAsync(Message message,
|
||||
private async Task<Message> HandleSingleItemDownloadedCallbackAsync(Message message,
|
||||
RequestError error,
|
||||
string downloadingMessageId,
|
||||
CancellationToken cancellationToken = default)
|
||||
@@ -1126,7 +1161,7 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
|
||||
{
|
||||
_logger.Warning("Skipped GMail message download for {DownloadingMessageId}", downloadingMessageId);
|
||||
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
// Gmail has LabelId property for each message.
|
||||
@@ -1135,20 +1170,27 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
|
||||
|
||||
// If CreateNewMailPackagesAsync returns null it means local draft mapping is done.
|
||||
// We don't need to insert anything else.
|
||||
if (mailPackage == null)
|
||||
return;
|
||||
if (mailPackage == null) return message;
|
||||
|
||||
foreach (var package in mailPackage)
|
||||
{
|
||||
await _gmailChangeProcessor.CreateMailAsync(Account.Id, package).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
private async Task UpdateAccountSyncIdentifierFromMessageAsync(Message message)
|
||||
{
|
||||
// Try updating the history change identifier if any.
|
||||
if (message.HistoryId == null) return;
|
||||
|
||||
// Delta changes also has history id but the maximum id is preserved in the account service.
|
||||
// TODO: This is not good. Centralize the identifier fetch and prevent direct access here.
|
||||
Account.SynchronizationDeltaIdentifier = await _gmailChangeProcessor.UpdateAccountDeltaSynchronizationIdentifierAsync(Account.Id, message.HistoryId.ToString());
|
||||
if (ulong.TryParse(Account.SynchronizationDeltaIdentifier, out ulong currentIdentifier) &&
|
||||
ulong.TryParse(message.HistoryId.Value.ToString(), out ulong messageIdentifier) &&
|
||||
messageIdentifier > currentIdentifier)
|
||||
{
|
||||
Account.SynchronizationDeltaIdentifier = await _gmailChangeProcessor.UpdateAccountDeltaSynchronizationIdentifierAsync(Account.Id, message.HistoryId.ToString());
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessSingleNativeRequestResponseAsync(IRequestBundle<IClientServiceRequest> bundle,
|
||||
@@ -1164,7 +1206,8 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
|
||||
|
||||
if (gmailMessage == null) return;
|
||||
|
||||
await HandleSingleItemDownloadedCallbackAsync(gmailMessage, error, "unknown", cancellationToken);
|
||||
await HandleSingleItemDownloadedCallbackAsync(gmailMessage, error, string.Empty, cancellationToken);
|
||||
await UpdateAccountSyncIdentifierFromMessageAsync(gmailMessage).ConfigureAwait(false);
|
||||
}
|
||||
else if (bundle is HttpRequestBundle<IClientServiceRequest, Label> folderBundle)
|
||||
{
|
||||
@@ -1210,9 +1253,12 @@ public class GmailSynchronizer : WinoSynchronizer<IClientServiceRequest, Message
|
||||
/// </summary>
|
||||
private async Task MapDraftIdsAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
// TODO: This call is not necessary if we don't have any local drafts.
|
||||
// Remote drafts will be downloaded in missing message batches anyways.
|
||||
// Fix it by checking whether we need to do this or not.
|
||||
// Check if account has any draft locally.
|
||||
// There is no point to send this query if there are no local drafts.
|
||||
|
||||
bool hasLocalDrafts = await _gmailChangeProcessor.HasAccountAnyDraftAsync(Account.Id).ConfigureAwait(false);
|
||||
|
||||
if (!hasLocalDrafts) return;
|
||||
|
||||
var drafts = await _gmailService.Users.Drafts.List("me").ExecuteAsync(cancellationToken);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user