Fixing UI thread issues with bulk operations and request queue refactoring.
This commit is contained in:
@@ -29,6 +29,8 @@ namespace Wino.Core.Synchronizers.ImapSync;
|
||||
public class UnifiedImapSynchronizer
|
||||
{
|
||||
private static readonly TimeSpan UidReconcileInterval = TimeSpan.FromHours(12);
|
||||
private const int NewMessageFetchBatchSize = 50;
|
||||
private const int ExistingMessageFlagFetchBatchSize = 250;
|
||||
|
||||
private readonly ILogger _logger = Log.ForContext<UnifiedImapSynchronizer>();
|
||||
private readonly IFolderService _folderService;
|
||||
@@ -47,6 +49,9 @@ public class UnifiedImapSynchronizer
|
||||
MessageSummaryItems.References |
|
||||
MessageSummaryItems.ModSeq |
|
||||
MessageSummaryItems.BodyStructure;
|
||||
private readonly MessageSummaryItems _existingMailSynchronizationFlags =
|
||||
MessageSummaryItems.Flags |
|
||||
MessageSummaryItems.UniqueId;
|
||||
|
||||
public UnifiedImapSynchronizer(
|
||||
IFolderService folderService,
|
||||
@@ -182,15 +187,35 @@ public class UnifiedImapSynchronizer
|
||||
|
||||
var downloadedMessageIds = new List<string>();
|
||||
|
||||
foreach (var batch in uids.Distinct().OrderBy(a => a.Id).Batch(50))
|
||||
foreach (var batch in uids.Distinct().OrderBy(a => a.Id).Batch(ExistingMessageFlagFetchBatchSize))
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var summaryBatch = await remoteFolder
|
||||
.FetchAsync(new UniqueIdSet(batch.ToList(), SortOrder.Ascending), _mailSynchronizationFlags, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
var batchUids = batch.ToList();
|
||||
var existingMails = await _mailService.GetExistingMailsAsync(localFolder.Id, batchUids).ConfigureAwait(false);
|
||||
var existingByUid = CreateExistingMailLookup(existingMails);
|
||||
var existingUids = batchUids.Where(uid => existingByUid.ContainsKey(uid.Id)).ToList();
|
||||
var newUids = batchUids.Where(uid => !existingByUid.ContainsKey(uid.Id)).ToList();
|
||||
|
||||
downloadedMessageIds.AddRange(await ProcessSummariesAsync(synchronizer, localFolder, summaryBatch, cancellationToken).ConfigureAwait(false));
|
||||
if (existingUids.Count > 0)
|
||||
{
|
||||
var existingSummaryBatch = await remoteFolder
|
||||
.FetchAsync(new UniqueIdSet(existingUids, SortOrder.Ascending), _existingMailSynchronizationFlags, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
await ApplySummaryFlagUpdatesAsync(existingByUid, existingSummaryBatch).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
foreach (var newBatch in newUids.Batch(NewMessageFetchBatchSize))
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var newSummaryBatch = await remoteFolder
|
||||
.FetchAsync(new UniqueIdSet(newBatch.ToList(), SortOrder.Ascending), _mailSynchronizationFlags, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
downloadedMessageIds.AddRange(await ProcessSummariesCoreAsync(synchronizer, localFolder, newSummaryBatch, existingByUid, cancellationToken).ConfigureAwait(false));
|
||||
}
|
||||
}
|
||||
|
||||
UpdateHighestKnownUid(localFolder, remoteFolder, uids.Select(a => a.Id));
|
||||
@@ -268,7 +293,29 @@ public class UnifiedImapSynchronizer
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
downloadedMessageIds = await DownloadMessagesByUidsAsync(client, remoteFolder, folder, changedUids, synchronizer, cancellationToken).ConfigureAwait(false);
|
||||
var existingMails = await _mailService.GetExistingMailsAsync(folder.Id, changedUids).ConfigureAwait(false);
|
||||
var existingByUid = CreateExistingMailLookup(existingMails);
|
||||
var newOrUnknownUids = changedUids.Where(uid => !existingByUid.ContainsKey(uid.Id)).ToList();
|
||||
var existingUidsWithoutFlagEvents = changedUids
|
||||
.Where(uid => existingByUid.ContainsKey(uid.Id) && !changedFlags.ContainsKey(uid.Id))
|
||||
.ToList();
|
||||
|
||||
if (existingUidsWithoutFlagEvents.Count > 0)
|
||||
{
|
||||
var missingEventSummaries = await remoteFolder
|
||||
.FetchAsync(new UniqueIdSet(existingUidsWithoutFlagEvents, SortOrder.Ascending), _existingMailSynchronizationFlags, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
foreach (var summary in missingEventSummaries)
|
||||
{
|
||||
if (summary.UniqueId != UniqueId.Invalid && summary.Flags != null)
|
||||
{
|
||||
changedFlags[summary.UniqueId.Id] = summary.Flags.Value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
downloadedMessageIds = await DownloadMessagesByUidsAsync(client, remoteFolder, folder, newOrUnknownUids, synchronizer, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
folder.HighestModeSeq = unchecked((long)remoteFolder.HighestModSeq);
|
||||
|
||||
@@ -456,11 +503,19 @@ public class UnifiedImapSynchronizer
|
||||
folder.UidValidity = remoteFolder.UidValidity;
|
||||
}
|
||||
|
||||
private async Task<List<string>> ProcessSummariesAsync(
|
||||
private Task<List<string>> ProcessSummariesAsync(
|
||||
IImapSynchronizer synchronizer,
|
||||
MailItemFolder localFolder,
|
||||
IList<IMessageSummary> summaries,
|
||||
CancellationToken cancellationToken)
|
||||
=> ProcessSummariesCoreAsync(synchronizer, localFolder, summaries, existingByUid: null, cancellationToken);
|
||||
|
||||
private async Task<List<string>> ProcessSummariesCoreAsync(
|
||||
IImapSynchronizer synchronizer,
|
||||
MailItemFolder localFolder,
|
||||
IList<IMessageSummary> summaries,
|
||||
IReadOnlyDictionary<uint, MailCopy> existingByUid,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var downloadedMessageIds = new List<string>();
|
||||
|
||||
@@ -475,10 +530,8 @@ public class UnifiedImapSynchronizer
|
||||
if (uniqueIds.Count == 0)
|
||||
return downloadedMessageIds;
|
||||
|
||||
var existingMails = await _mailService.GetExistingMailsAsync(localFolder.Id, uniqueIds).ConfigureAwait(false);
|
||||
var existingByUid = existingMails
|
||||
.Select(m => (Uid: MailkitClientExtensions.ResolveUidStruct(m.Id), Mail: m))
|
||||
.ToDictionary(a => a.Uid.Id, a => a.Mail);
|
||||
existingByUid ??= CreateExistingMailLookup(await _mailService.GetExistingMailsAsync(localFolder.Id, uniqueIds).ConfigureAwait(false));
|
||||
var pendingStateUpdates = new List<MailCopyStateUpdate>();
|
||||
|
||||
foreach (var summary in summaries)
|
||||
{
|
||||
@@ -491,7 +544,11 @@ public class UnifiedImapSynchronizer
|
||||
{
|
||||
if (summary.Flags != null)
|
||||
{
|
||||
await UpdateMailFlagsAsync(existingMail, summary.Flags.Value).ConfigureAwait(false);
|
||||
var pendingStateUpdate = CreateMailStateUpdate(existingMail, summary.Flags.Value);
|
||||
if (pendingStateUpdate != null)
|
||||
{
|
||||
pendingStateUpdates.Add(pendingStateUpdate);
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
@@ -516,23 +573,79 @@ public class UnifiedImapSynchronizer
|
||||
}
|
||||
}
|
||||
|
||||
if (pendingStateUpdates.Count > 0)
|
||||
{
|
||||
await _mailService.ApplyMailStateUpdatesAsync(pendingStateUpdates).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
return downloadedMessageIds;
|
||||
}
|
||||
|
||||
private async Task UpdateMailFlagsAsync(MailCopy mailCopy, MessageFlags flags)
|
||||
private async Task ApplySummaryFlagUpdatesAsync(
|
||||
IReadOnlyDictionary<uint, MailCopy> existingByUid,
|
||||
IList<IMessageSummary> summaries)
|
||||
{
|
||||
if (existingByUid == null || existingByUid.Count == 0 || summaries == null || summaries.Count == 0)
|
||||
return;
|
||||
|
||||
var pendingStateUpdates = new List<MailCopyStateUpdate>();
|
||||
|
||||
foreach (var summary in summaries)
|
||||
{
|
||||
if (summary.UniqueId == UniqueId.Invalid || summary.Flags == null)
|
||||
continue;
|
||||
|
||||
if (!existingByUid.TryGetValue(summary.UniqueId.Id, out var existingMail))
|
||||
continue;
|
||||
|
||||
var pendingStateUpdate = CreateMailStateUpdate(existingMail, summary.Flags.Value);
|
||||
if (pendingStateUpdate != null)
|
||||
{
|
||||
pendingStateUpdates.Add(pendingStateUpdate);
|
||||
}
|
||||
}
|
||||
|
||||
if (pendingStateUpdates.Count > 0)
|
||||
{
|
||||
await _mailService.ApplyMailStateUpdatesAsync(pendingStateUpdates).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private static IReadOnlyDictionary<uint, MailCopy> CreateExistingMailLookup(IEnumerable<MailCopy> existingMails)
|
||||
{
|
||||
var lookup = new Dictionary<uint, MailCopy>();
|
||||
|
||||
foreach (var mail in existingMails ?? [])
|
||||
{
|
||||
if (mail == null || string.IsNullOrEmpty(mail.Id))
|
||||
continue;
|
||||
|
||||
try
|
||||
{
|
||||
lookup[MailkitClientExtensions.ResolveUidStruct(mail.Id).Id] = mail;
|
||||
}
|
||||
catch (ArgumentOutOfRangeException)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
return lookup;
|
||||
}
|
||||
|
||||
private static MailCopyStateUpdate CreateMailStateUpdate(MailCopy mailCopy, MessageFlags flags)
|
||||
{
|
||||
var isFlagged = MailkitClientExtensions.GetIsFlagged(flags);
|
||||
var isRead = MailkitClientExtensions.GetIsRead(flags);
|
||||
|
||||
if (isFlagged != mailCopy.IsFlagged)
|
||||
{
|
||||
await _mailService.ChangeFlagStatusAsync(mailCopy.Id, isFlagged).ConfigureAwait(false);
|
||||
}
|
||||
bool shouldUpdateFlagged = isFlagged != mailCopy.IsFlagged;
|
||||
bool shouldUpdateRead = isRead != mailCopy.IsRead;
|
||||
|
||||
if (isRead != mailCopy.IsRead)
|
||||
{
|
||||
await _mailService.ChangeReadStatusAsync(mailCopy.Id, isRead).ConfigureAwait(false);
|
||||
}
|
||||
return !shouldUpdateFlagged && !shouldUpdateRead
|
||||
? null
|
||||
: new MailCopyStateUpdate(
|
||||
mailCopy.Id,
|
||||
shouldUpdateRead ? isRead : null,
|
||||
shouldUpdateFlagged ? isFlagged : null);
|
||||
}
|
||||
|
||||
private async Task ApplyDeletedUidsAsync(MailItemFolder folder, IList<UniqueId> uniqueIds)
|
||||
@@ -552,15 +665,14 @@ public class UnifiedImapSynchronizer
|
||||
if (changedFlags == null || changedFlags.Count == 0)
|
||||
return;
|
||||
|
||||
foreach (var changed in changedFlags)
|
||||
{
|
||||
var localMailCopyId = MailkitClientExtensions.CreateUid(folder.Id, changed.Key);
|
||||
var isFlagged = MailkitClientExtensions.GetIsFlagged(changed.Value);
|
||||
var isRead = MailkitClientExtensions.GetIsRead(changed.Value);
|
||||
var stateUpdates = changedFlags
|
||||
.Select(changed => new MailCopyStateUpdate(
|
||||
MailkitClientExtensions.CreateUid(folder.Id, changed.Key),
|
||||
MailkitClientExtensions.GetIsRead(changed.Value),
|
||||
MailkitClientExtensions.GetIsFlagged(changed.Value)))
|
||||
.ToList();
|
||||
|
||||
await _mailService.ChangeReadStatusAsync(localMailCopyId, isRead).ConfigureAwait(false);
|
||||
await _mailService.ChangeFlagStatusAsync(localMailCopyId, isFlagged).ConfigureAwait(false);
|
||||
}
|
||||
await _mailService.ApplyMailStateUpdatesAsync(stateUpdates).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task ReconcileUidBasedFlagChangesAsync(MailItemFolder localFolder, IMailFolder remoteFolder, CancellationToken cancellationToken)
|
||||
@@ -613,13 +725,14 @@ public class UnifiedImapSynchronizer
|
||||
|
||||
var existingMarkReadCandidates = await FilterExistingRemoteUidsAsync(remoteFolder, markReadCandidates, cancellationToken).ConfigureAwait(false);
|
||||
var existingUnflagCandidates = await FilterExistingRemoteUidsAsync(remoteFolder, unflagCandidates, cancellationToken).ConfigureAwait(false);
|
||||
var pendingStateUpdates = new List<MailCopyStateUpdate>();
|
||||
|
||||
foreach (var uid in existingMarkReadCandidates)
|
||||
{
|
||||
if (!localByUid.TryGetValue(uid, out var localMail) || localMail.IsRead)
|
||||
continue;
|
||||
|
||||
await _mailService.ChangeReadStatusAsync(localMail.Id, true).ConfigureAwait(false);
|
||||
pendingStateUpdates.Add(new MailCopyStateUpdate(localMail.Id, IsRead: true));
|
||||
}
|
||||
|
||||
foreach (var uid in remoteUnreadUids)
|
||||
@@ -627,7 +740,7 @@ public class UnifiedImapSynchronizer
|
||||
if (!localByUid.TryGetValue(uid, out var localMail) || !localMail.IsRead)
|
||||
continue;
|
||||
|
||||
await _mailService.ChangeReadStatusAsync(localMail.Id, false).ConfigureAwait(false);
|
||||
pendingStateUpdates.Add(new MailCopyStateUpdate(localMail.Id, IsRead: false));
|
||||
}
|
||||
|
||||
foreach (var uid in existingUnflagCandidates)
|
||||
@@ -635,7 +748,7 @@ public class UnifiedImapSynchronizer
|
||||
if (!localByUid.TryGetValue(uid, out var localMail) || !localMail.IsFlagged)
|
||||
continue;
|
||||
|
||||
await _mailService.ChangeFlagStatusAsync(localMail.Id, false).ConfigureAwait(false);
|
||||
pendingStateUpdates.Add(new MailCopyStateUpdate(localMail.Id, IsFlagged: false));
|
||||
}
|
||||
|
||||
foreach (var uid in remoteFlaggedUids)
|
||||
@@ -643,7 +756,12 @@ public class UnifiedImapSynchronizer
|
||||
if (!localByUid.TryGetValue(uid, out var localMail) || localMail.IsFlagged)
|
||||
continue;
|
||||
|
||||
await _mailService.ChangeFlagStatusAsync(localMail.Id, true).ConfigureAwait(false);
|
||||
pendingStateUpdates.Add(new MailCopyStateUpdate(localMail.Id, IsFlagged: true));
|
||||
}
|
||||
|
||||
if (pendingStateUpdates.Count > 0)
|
||||
{
|
||||
await _mailService.ApplyMailStateUpdatesAsync(pendingStateUpdates).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user