Better thread handling for mail collection.

This commit is contained in:
Burak Kaan Köse
2026-04-06 11:21:51 +02:00
parent c8265e75be
commit ff05195416
3 changed files with 201 additions and 52 deletions
@@ -1,3 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions; using FluentAssertions;
using Wino.Core.Domain.Entities.Mail; using Wino.Core.Domain.Entities.Mail;
using Wino.Core.Domain.Enums; using Wino.Core.Domain.Enums;
@@ -113,6 +117,57 @@ public class WinoMailCollectionTests
threadItems.Should().Contain(item => item.ThreadId == "thread-c" && item.EmailCount == 2); threadItems.Should().Contain(item => item.ThreadId == "thread-c" && item.EmailCount == 2);
} }
[Fact]
public async Task AddRangeAsync_ShouldKeepGroupsAndItemsSortedDuringHighVolumeInitialization()
{
var sut = CreateCollection();
var baseDate = new DateTime(2026, 4, 6, 12, 0, 0, DateTimeKind.Utc);
var items = Enumerable.Range(0, 240)
.Select(index =>
{
var dayOffset = index % 4;
var minuteOffset = 240 - index;
return new MailItemViewModel(CreateMailCopy(
threadId: $"single-{index}",
creationDate: baseDate.AddDays(-dayOffset).AddMinutes(-minuteOffset)));
})
.OrderByDescending(item => item.MailCopy.UniqueId)
.ToList();
await sut.AddRangeAsync(items, clearIdCache: true);
var groups = new List<(DateTime Key, List<IMailListItem> Items)>();
foreach (var group in sut.MailItems)
{
var groupItems = new List<IMailListItem>();
foreach (var item in group)
{
groupItems.Add(item);
}
groups.Add(((DateTime)group.Key, groupItems));
}
groups.Should().NotBeEmpty();
var orderedGroupKeys = groups.Select(group => group.Key).ToList();
orderedGroupKeys.Should().BeInDescendingOrder();
foreach (var group in groups)
{
group.Items.Should().OnlyContain(item => item is MailItemViewModel);
var creationDates = group.Items
.Cast<MailItemViewModel>()
.Select(item => item.MailCopy.CreationDate)
.ToList();
creationDates.Should().BeInDescendingOrder();
}
}
[Fact] [Fact]
public async Task UpdateMailCopy_ShouldMergeExistingSingles_WhenThreadIdChangesToMatch() public async Task UpdateMailCopy_ShouldMergeExistingSingles_WhenThreadIdChangesToMatch()
{ {
@@ -155,6 +210,48 @@ public class WinoMailCollectionTests
threadItem.GetContainingIds().Should().BeEquivalentTo([existing.UniqueId, incoming.UniqueId]); threadItem.GetContainingIds().Should().BeEquivalentTo([existing.UniqueId, incoming.UniqueId]);
} }
[Fact]
public async Task AddAsync_ShouldRemainConsistentUnderHighVolumeConcurrentAdds()
{
var sut = CreateCollection();
var threadCount = 40;
var mailsPerThread = 25;
var baseDate = new DateTime(2026, 4, 6, 12, 0, 0, DateTimeKind.Utc);
var mails = Enumerable.Range(0, threadCount)
.SelectMany(threadIndex => Enumerable.Range(0, mailsPerThread)
.Select(mailIndex => CreateMailCopy(
threadId: $"thread-{threadIndex}",
creationDate: baseDate.AddMinutes(-(threadIndex * mailsPerThread + mailIndex)))))
.OrderBy(_ => Guid.NewGuid())
.ToList();
await Task.WhenAll(mails.Select(mail => Task.Run(() => sut.AddAsync(mail))));
var flattenedMailIds = FlattenMailItems(sut)
.Select(item => item.MailCopy.UniqueId)
.ToList();
flattenedMailIds.Should().HaveCount(threadCount * mailsPerThread);
flattenedMailIds.Should().OnlyHaveUniqueItems();
flattenedMailIds.Should().BeEquivalentTo(mails.Select(mail => mail.UniqueId));
var topLevelItems = FlattenItems(sut);
topLevelItems.Should().HaveCount(threadCount);
topLevelItems.Should().OnlyContain(item => item is ThreadMailItemViewModel);
foreach (var thread in topLevelItems.Cast<ThreadMailItemViewModel>())
{
thread.EmailCount.Should().Be(mailsPerThread);
var expectedIds = mails
.Where(mail => mail.ThreadId == thread.ThreadId)
.Select(mail => mail.UniqueId);
thread.GetContainingIds().Should().BeEquivalentTo(expectedIds);
}
}
private static WinoMailCollection CreateCollection() => new() private static WinoMailCollection CreateCollection() => new()
{ {
CoreDispatcher = new ImmediateDispatcher() CoreDispatcher = new ImmediateDispatcher()
@@ -175,6 +272,28 @@ public class WinoMailCollectionTests
return items; return items;
} }
private static List<MailItemViewModel> FlattenMailItems(WinoMailCollection collection)
{
var items = new List<MailItemViewModel>();
foreach (var group in collection.MailItems)
{
foreach (var item in group)
{
if (item is MailItemViewModel mailItem)
{
items.Add(mailItem);
}
else if (item is ThreadMailItemViewModel threadItem)
{
items.AddRange(threadItem.ThreadEmails);
}
}
}
return items;
}
private static MailCopy CreateMailCopy(string threadId, DateTime? creationDate = null) private static MailCopy CreateMailCopy(string threadId, DateTime? creationDate = null)
=> new() => new()
{ {
@@ -2,6 +2,7 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using CommunityToolkit.Mvvm.Collections; using CommunityToolkit.Mvvm.Collections;
using CommunityToolkit.Mvvm.ComponentModel; using CommunityToolkit.Mvvm.ComponentModel;
@@ -42,6 +43,7 @@ public class WinoMailCollection : ObservableRecipient, IRecipient<SelectedItemsC
private ListItemComparer listComparer = new(); private ListItemComparer listComparer = new();
private readonly ObservableGroupedCollection<object, IMailListItem> _mailItemSource = new ObservableGroupedCollection<object, IMailListItem>(); private readonly ObservableGroupedCollection<object, IMailListItem> _mailItemSource = new ObservableGroupedCollection<object, IMailListItem>();
private readonly SemaphoreSlim _mutationGate = new(1, 1);
public ReadOnlyObservableGroupedCollection<object, IMailListItem> MailItems { get; } public ReadOnlyObservableGroupedCollection<object, IMailListItem> MailItems { get; }
@@ -106,25 +108,28 @@ public class WinoMailCollection : ObservableRecipient, IRecipient<SelectedItemsC
public async Task ClearAsync() public async Task ClearAsync()
{ {
await ExecuteUIThread(() => await RunSerializedAsync(async () =>
{ {
foreach (var group in _mailItemSource) await ExecuteUIThread(() =>
{ {
foreach (var item in group) foreach (var group in _mailItemSource)
{ {
if (item is ThreadMailItemViewModel threadItem) foreach (var item in group)
{ {
threadItem.UnregisterThreadEmailPropertyChangedHandlers(); if (item is ThreadMailItemViewModel threadItem)
{
threadItem.UnregisterThreadEmailPropertyChangedHandlers();
}
} }
} }
}
_mailItemSource.Clear(); _mailItemSource.Clear();
MailCopyIdHashSet.Clear(); MailCopyIdHashSet.Clear();
_threadIdToItemsMap.Clear(); _threadIdToItemsMap.Clear();
_itemToGroupMap.Clear(); _itemToGroupMap.Clear();
_uniqueIdToMailItemMap.Clear(); _uniqueIdToMailItemMap.Clear();
_uniqueIdToThreadMap.Clear(); _uniqueIdToThreadMap.Clear();
});
}); });
} }
@@ -434,7 +439,10 @@ public class WinoMailCollection : ObservableRecipient, IRecipient<SelectedItemsC
await InsertItemInternalAsync(newGroupKey, threadViewModel); await InsertItemInternalAsync(newGroupKey, threadViewModel);
} }
public async Task AddAsync(MailCopy addedItem) public Task AddAsync(MailCopy addedItem)
=> RunSerializedAsync(() => AddInternalAsync(addedItem));
private async Task AddInternalAsync(MailCopy addedItem)
{ {
// First check if this is an update to an existing item // First check if this is an update to an existing item
if (MailCopyIdHashSet.ContainsKey(addedItem.UniqueId)) if (MailCopyIdHashSet.ContainsKey(addedItem.UniqueId))
@@ -504,8 +512,8 @@ public class WinoMailCollection : ObservableRecipient, IRecipient<SelectedItemsC
private async Task ReinsertUpdatedItemAsync(MailCopy updatedItem, bool isSelected, bool isBusy) private async Task ReinsertUpdatedItemAsync(MailCopy updatedItem, bool isSelected, bool isBusy)
{ {
await RemoveAsync(updatedItem); await RemoveInternalAsync(updatedItem);
await AddAsync(updatedItem); await AddInternalAsync(updatedItem);
var updatedContainer = GetMailItemContainer(updatedItem.UniqueId); var updatedContainer = GetMailItemContainer(updatedItem.UniqueId);
if (updatedContainer?.ItemViewModel == null) if (updatedContainer?.ItemViewModel == null)
@@ -584,7 +592,10 @@ public class WinoMailCollection : ObservableRecipient, IRecipient<SelectedItemsC
/// <summary> /// <summary>
/// Adds multiple emails to the collection. /// Adds multiple emails to the collection.
/// </summary> /// </summary>
public async Task AddRangeAsync(IEnumerable<MailItemViewModel> items, bool clearIdCache) public Task AddRangeAsync(IEnumerable<MailItemViewModel> items, bool clearIdCache)
=> RunSerializedAsync(() => AddRangeInternalAsync(items, clearIdCache));
private async Task AddRangeInternalAsync(IEnumerable<MailItemViewModel> items, bool clearIdCache)
{ {
if (clearIdCache) if (clearIdCache)
{ {
@@ -717,17 +728,22 @@ public class WinoMailCollection : ObservableRecipient, IRecipient<SelectedItemsC
// Group items by their grouping key and add them in a single UI thread call // Group items by their grouping key and add them in a single UI thread call
if (itemsToAdd.Count > 0) if (itemsToAdd.Count > 0)
{ {
// Pre-compute grouping on background thread to reduce UI thread work
var groupedItems = await Task.Run(() => itemsToAdd var groupedItems = await Task.Run(() => itemsToAdd
.GroupBy(GetGroupingKey) .GroupBy(GetGroupingKey)
.ToDictionary(g => g.Key, g => g.ToList())).ConfigureAwait(false); .OrderBy(group => group.Key, listComparer)
.Select(group => new
{
Key = group.Key,
Items = group.OrderBy(item => (object)item, listComparer).ToList()
})
.ToList()).ConfigureAwait(false);
await ExecuteUIThread(() => await ExecuteUIThread(() =>
{ {
foreach (var kvp in groupedItems) foreach (var groupedItem in groupedItems)
{ {
var groupKey = kvp.Key; var groupKey = groupedItem.Key;
var groupItems = kvp.Value; var groupItems = groupedItem.Items;
// Update caches first // Update caches first
foreach (var item in groupItems) foreach (var item in groupItems)
@@ -736,25 +752,14 @@ public class WinoMailCollection : ObservableRecipient, IRecipient<SelectedItemsC
UpdateThreadIdCache(item, true); UpdateThreadIdCache(item, true);
} }
var existingGroup = _mailItemSource.FirstGroupByKeyOrDefault(groupKey); foreach (var item in groupItems)
if (existingGroup == null)
{ {
var newGroup = new ObservableGroup<object, IMailListItem>(groupKey, groupItems); _mailItemSource.InsertItem(groupKey, listComparer, item, listComparer);
_mailItemSource.AddGroup(groupKey, newGroup);
// Update item-to-group cache var targetGroup = _mailItemSource.FirstGroupByKeyOrDefault(groupKey);
foreach (var item in groupItems) if (targetGroup != null)
{ {
_itemToGroupMap[item] = newGroup; _itemToGroupMap[item] = targetGroup;
}
}
else
{
foreach (var item in groupItems)
{
existingGroup.Add(item);
_itemToGroupMap[item] = existingGroup;
} }
} }
} }
@@ -814,7 +819,7 @@ public class WinoMailCollection : ObservableRecipient, IRecipient<SelectedItemsC
{ {
if (CoreDispatcher == null) return Task.CompletedTask; if (CoreDispatcher == null) return Task.CompletedTask;
return CoreDispatcher.ExecuteOnUIThread(() => return RunSerializedAsync(() => CoreDispatcher.ExecuteOnUIThread(() =>
{ {
foreach (var group in _mailItemSource) foreach (var group in _mailItemSource)
{ {
@@ -836,7 +841,7 @@ public class WinoMailCollection : ObservableRecipient, IRecipient<SelectedItemsC
} }
} }
} }
}); }));
} }
/// <summary> /// <summary>
@@ -845,16 +850,17 @@ public class WinoMailCollection : ObservableRecipient, IRecipient<SelectedItemsC
/// <param name="updatedMailCopy">Updated mail copy.</param> /// <param name="updatedMailCopy">Updated mail copy.</param>
/// <returns></returns> /// <returns></returns>
public Task UpdateMailCopy(MailCopy updatedMailCopy, MailUpdateSource mailUpdateSource, MailCopyChangeFlags changedProperties = MailCopyChangeFlags.None) public Task UpdateMailCopy(MailCopy updatedMailCopy, MailUpdateSource mailUpdateSource, MailCopyChangeFlags changedProperties = MailCopyChangeFlags.None)
{ => RunSerializedAsync(() =>
var itemContainer = GetMailItemContainer(updatedMailCopy.UniqueId);
if (itemContainer?.ItemViewModel == null)
{ {
return Task.CompletedTask; var itemContainer = GetMailItemContainer(updatedMailCopy.UniqueId);
}
return UpdateExistingItemAsync(itemContainer, updatedMailCopy, mailUpdateSource, changedProperties); if (itemContainer?.ItemViewModel == null)
} {
return Task.CompletedTask;
}
return UpdateExistingItemAsync(itemContainer, updatedMailCopy, mailUpdateSource, changedProperties);
});
public MailItemViewModel GetFirst() => AllItems.ElementAtOrDefault(0); public MailItemViewModel GetFirst() => AllItems.ElementAtOrDefault(0);
@@ -921,7 +927,10 @@ public class WinoMailCollection : ObservableRecipient, IRecipient<SelectedItemsC
return null; return null;
} }
public async Task RemoveAsync(MailCopy removeItem) public Task RemoveAsync(MailCopy removeItem)
=> RunSerializedAsync(() => RemoveInternalAsync(removeItem));
private async Task RemoveInternalAsync(MailCopy removeItem)
{ {
var itemContainer = GetMailItemContainer(removeItem.UniqueId); var itemContainer = GetMailItemContainer(removeItem.UniqueId);
@@ -1121,6 +1130,20 @@ public class WinoMailCollection : ObservableRecipient, IRecipient<SelectedItemsC
private Task ExecuteUIThread(Action action) => CoreDispatcher?.ExecuteOnUIThread(action); private Task ExecuteUIThread(Action action) => CoreDispatcher?.ExecuteOnUIThread(action);
private async Task RunSerializedAsync(Func<Task> action)
{
await _mutationGate.WaitAsync().ConfigureAwait(false);
try
{
await action().ConfigureAwait(false);
}
finally
{
_mutationGate.Release();
}
}
public void Receive(SelectedItemsChangedMessage message) => _ = NotifySelectionChangesAsync(); public void Receive(SelectedItemsChangedMessage message) => _ = NotifySelectionChangesAsync();
private async Task NotifySelectionChangesAsync() private async Task NotifySelectionChangesAsync()
+11 -4
View File
@@ -764,11 +764,17 @@ public partial class MailListPageViewModel : MailBaseViewModel,
if (addedMail.AssignedAccount == null || addedMail.AssignedFolder == null) return; if (addedMail.AssignedAccount == null || addedMail.AssignedFolder == null) return;
bool hasLock = false;
try try
{ {
await listManipulationSemepahore.WaitAsync();
hasLock = true;
if (ActiveFolder == null) return; if (ActiveFolder == null) return;
// At least one of the accounts we are listing must match with the account of the added mail. // Re-evaluate folder membership after acquiring the semaphore so an add that was queued
// behind a folder re-initialization cannot land in the newly selected folder by mistake.
if (!ActiveFolder.HandlingFolders.Any(a => a.MailAccountId == addedMail.AssignedAccount.Id)) return; if (!ActiveFolder.HandlingFolders.Any(a => a.MailAccountId == addedMail.AssignedAccount.Id)) return;
// Fix for draft duplication: When a draft is created for reply/forward, it's first added as local draft. // Fix for draft duplication: When a draft is created for reply/forward, it's first added as local draft.
@@ -838,8 +844,6 @@ public partial class MailListPageViewModel : MailBaseViewModel,
if (!IsMailMatchingLocalSearch(addedMail)) return; if (!IsMailMatchingLocalSearch(addedMail)) return;
} }
await listManipulationSemepahore.WaitAsync();
// AddAsync already handles UI threading internally, no need to wrap it // AddAsync already handles UI threading internally, no need to wrap it
await MailCollection.AddAsync(addedMail); await MailCollection.AddAsync(addedMail);
@@ -851,7 +855,10 @@ public partial class MailListPageViewModel : MailBaseViewModel,
catch { } catch { }
finally finally
{ {
listManipulationSemepahore.Release(); if (hasLock)
{
listManipulationSemepahore.Release();
}
} }
} }