Handling of generalException and some refactorings on batch executions.
This commit is contained in:
@@ -4,6 +4,7 @@ using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Nodes;
|
||||
@@ -16,6 +17,7 @@ using Microsoft.Graph.Models;
|
||||
using Microsoft.Graph.Models.ODataErrors;
|
||||
using Microsoft.Kiota.Abstractions;
|
||||
using Microsoft.Kiota.Abstractions.Authentication;
|
||||
using Microsoft.Kiota.Abstractions.Serialization;
|
||||
using Microsoft.Kiota.Http.HttpClientLibrary.Middleware;
|
||||
using Microsoft.Kiota.Http.HttpClientLibrary.Middleware.Options;
|
||||
using MimeKit;
|
||||
@@ -445,95 +447,121 @@ public class OutlookSynchronizer : WinoSynchronizer<RequestInformation, Message,
|
||||
|
||||
private async Task SynchronizeFoldersAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
// Gather special folders by default.
|
||||
// Others will be other type.
|
||||
var specialFolderInfo = await GetSpecialFolderIdsAsync(cancellationToken).ConfigureAwait(false);
|
||||
var graphFolders = await GetDeltaFoldersAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Get well known folder ids by batch.
|
||||
|
||||
retry:
|
||||
var wellKnownFolderIdBatch = new BatchRequestContentCollection(_graphClient);
|
||||
|
||||
var inboxRequest = _graphClient.Me.MailFolders[INBOX_NAME].ToGetRequestInformation((t) => { t.QueryParameters.Select = ["id"]; });
|
||||
var sentRequest = _graphClient.Me.MailFolders[SENT_NAME].ToGetRequestInformation((t) => { t.QueryParameters.Select = ["id"]; });
|
||||
var deletedRequest = _graphClient.Me.MailFolders[DELETED_NAME].ToGetRequestInformation((t) => { t.QueryParameters.Select = ["id"]; });
|
||||
var junkRequest = _graphClient.Me.MailFolders[JUNK_NAME].ToGetRequestInformation((t) => { t.QueryParameters.Select = ["id"]; });
|
||||
var draftsRequest = _graphClient.Me.MailFolders[DRAFTS_NAME].ToGetRequestInformation((t) => { t.QueryParameters.Select = ["id"]; });
|
||||
var archiveRequest = _graphClient.Me.MailFolders[ARCHIVE_NAME].ToGetRequestInformation((t) => { t.QueryParameters.Select = ["id"]; });
|
||||
|
||||
var inboxId = await wellKnownFolderIdBatch.AddBatchRequestStepAsync(inboxRequest);
|
||||
var sentId = await wellKnownFolderIdBatch.AddBatchRequestStepAsync(sentRequest);
|
||||
var deletedId = await wellKnownFolderIdBatch.AddBatchRequestStepAsync(deletedRequest);
|
||||
var junkId = await wellKnownFolderIdBatch.AddBatchRequestStepAsync(junkRequest);
|
||||
var draftsId = await wellKnownFolderIdBatch.AddBatchRequestStepAsync(draftsRequest);
|
||||
var archiveId = await wellKnownFolderIdBatch.AddBatchRequestStepAsync(archiveRequest);
|
||||
|
||||
var returnedResponse = await _graphClient.Batch.PostAsync(wellKnownFolderIdBatch, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var inboxFolderId = (await returnedResponse.GetResponseByIdAsync<MailFolder>(inboxId)).Id;
|
||||
var sentFolderId = (await returnedResponse.GetResponseByIdAsync<MailFolder>(sentId)).Id;
|
||||
var deletedFolderId = (await returnedResponse.GetResponseByIdAsync<MailFolder>(deletedId)).Id;
|
||||
var junkFolderId = (await returnedResponse.GetResponseByIdAsync<MailFolder>(junkId)).Id;
|
||||
var draftsFolderId = (await returnedResponse.GetResponseByIdAsync<MailFolder>(draftsId)).Id;
|
||||
var archiveFolderId = (await returnedResponse.GetResponseByIdAsync<MailFolder>(archiveId)).Id;
|
||||
|
||||
var specialFolderInfo = new OutlookSpecialFolderIdInformation(inboxFolderId, deletedFolderId, junkFolderId, draftsFolderId, sentFolderId, archiveFolderId);
|
||||
|
||||
Microsoft.Graph.Me.MailFolders.Delta.DeltaGetResponse graphFolders = null;
|
||||
|
||||
if (string.IsNullOrEmpty(Account.SynchronizationDeltaIdentifier))
|
||||
{
|
||||
// Initial folder sync.
|
||||
|
||||
var deltaRequest = _graphClient.Me.MailFolders.Delta.ToGetRequestInformation();
|
||||
|
||||
deltaRequest.UrlTemplate = deltaRequest.UrlTemplate.Insert(deltaRequest.UrlTemplate.Length - 1, ",includehiddenfolders");
|
||||
deltaRequest.QueryParameters.Add("includehiddenfolders", "true");
|
||||
|
||||
graphFolders = await _graphClient.RequestAdapter.SendAsync(deltaRequest,
|
||||
Microsoft.Graph.Me.MailFolders.Delta.DeltaGetResponse.CreateFromDiscriminatorValue,
|
||||
cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
var currentDeltaLink = Account.SynchronizationDeltaIdentifier;
|
||||
|
||||
var deltaRequest = _graphClient.Me.MailFolders.Delta.ToGetRequestInformation();
|
||||
|
||||
deltaRequest.UrlTemplate = deltaRequest.UrlTemplate.Insert(deltaRequest.UrlTemplate.Length - 1, ",%24deltaToken");
|
||||
deltaRequest.QueryParameters.Add("%24deltaToken", currentDeltaLink);
|
||||
|
||||
try
|
||||
{
|
||||
graphFolders = await _graphClient.RequestAdapter.SendAsync(deltaRequest,
|
||||
Microsoft.Graph.Me.MailFolders.Delta.DeltaGetResponse.CreateFromDiscriminatorValue,
|
||||
cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (ApiException apiException) when (apiException.ResponseStatusCode == 410)
|
||||
{
|
||||
Account.SynchronizationDeltaIdentifier = string.Empty;
|
||||
|
||||
goto retry;
|
||||
}
|
||||
}
|
||||
|
||||
var iterator = PageIterator<MailFolder, Microsoft.Graph.Me.MailFolders.Delta.DeltaGetResponse>.CreatePageIterator(_graphClient, graphFolders, (folder) =>
|
||||
{
|
||||
return HandleFolderRetrievedAsync(folder, specialFolderInfo, cancellationToken);
|
||||
});
|
||||
var iterator = PageIterator<MailFolder, Microsoft.Graph.Me.MailFolders.Delta.DeltaGetResponse>
|
||||
.CreatePageIterator(_graphClient, graphFolders, (folder) =>
|
||||
HandleFolderRetrievedAsync(folder, specialFolderInfo, cancellationToken));
|
||||
|
||||
await iterator.IterateAsync();
|
||||
|
||||
if (!string.IsNullOrEmpty(iterator.Deltalink))
|
||||
await UpdateDeltaSynchronizationIdentifierAsync(iterator.Deltalink).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task<T> DeserializeGraphBatchResponseAsync<T>(BatchResponseContentCollection collection, string requestId, CancellationToken cancellationToken = default) where T : IParsable, new()
|
||||
{
|
||||
// This deserialization may throw generalException in case of failure.
|
||||
// Bug: https://github.com/microsoftgraph/msgraph-sdk-dotnet/issues/2010
|
||||
// This is a workaround for the bug to retrieve the actual exception.
|
||||
// All generic batch response deserializations must go under this method.
|
||||
|
||||
try
|
||||
{
|
||||
// Get the second part of the query that its the deltaToken
|
||||
var deltaToken = iterator.Deltalink.Split('=')[1];
|
||||
return await collection.GetResponseByIdAsync<T>(requestId);
|
||||
}
|
||||
catch (ODataError)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (ServiceException serviceException)
|
||||
{
|
||||
// Actual exception is hidden inside ServiceException.
|
||||
|
||||
var latestAccountDeltaToken = await _outlookChangeProcessor.UpdateAccountDeltaSynchronizationIdentifierAsync(Account.Id, deltaToken);
|
||||
|
||||
if (!string.IsNullOrEmpty(latestAccountDeltaToken))
|
||||
{
|
||||
Account.SynchronizationDeltaIdentifier = latestAccountDeltaToken;
|
||||
}
|
||||
ODataError errorResult = await KiotaJsonSerializer.DeserializeAsync<ODataError>(serviceException.RawResponseBody, cancellationToken);
|
||||
|
||||
throw new SynchronizerException("Outlook Error", errorResult);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<OutlookSpecialFolderIdInformation> GetSpecialFolderIdsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var wellKnownFolderIdBatch = new BatchRequestContentCollection(_graphClient);
|
||||
var folderRequests = new Dictionary<string, RequestInformation>
|
||||
{
|
||||
{ INBOX_NAME, _graphClient.Me.MailFolders[INBOX_NAME].ToGetRequestInformation((t) => { t.QueryParameters.Select = ["id"]; }) },
|
||||
{ SENT_NAME, _graphClient.Me.MailFolders[SENT_NAME].ToGetRequestInformation((t) => { t.QueryParameters.Select = ["id"]; }) },
|
||||
{ DELETED_NAME, _graphClient.Me.MailFolders[DELETED_NAME].ToGetRequestInformation((t) => { t.QueryParameters.Select = ["id"]; }) },
|
||||
{ JUNK_NAME, _graphClient.Me.MailFolders[JUNK_NAME].ToGetRequestInformation((t) => { t.QueryParameters.Select = ["id"]; }) },
|
||||
{ DRAFTS_NAME, _graphClient.Me.MailFolders[DRAFTS_NAME].ToGetRequestInformation((t) => { t.QueryParameters.Select = ["id"]; }) },
|
||||
{ ARCHIVE_NAME, _graphClient.Me.MailFolders[ARCHIVE_NAME].ToGetRequestInformation((t) => { t.QueryParameters.Select = ["id"]; }) }
|
||||
};
|
||||
|
||||
var batchIds = new Dictionary<string, string>();
|
||||
foreach (var request in folderRequests)
|
||||
{
|
||||
batchIds[request.Key] = await wellKnownFolderIdBatch.AddBatchRequestStepAsync(request.Value);
|
||||
}
|
||||
|
||||
var returnedResponse = await _graphClient.Batch.PostAsync(wellKnownFolderIdBatch, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var folderIds = new Dictionary<string, string>();
|
||||
foreach (var batchId in batchIds)
|
||||
{
|
||||
folderIds[batchId.Key] = (await DeserializeGraphBatchResponseAsync<MailFolder>(returnedResponse, batchId.Value, cancellationToken)).Id;
|
||||
}
|
||||
|
||||
return new OutlookSpecialFolderIdInformation(
|
||||
folderIds[INBOX_NAME],
|
||||
folderIds[DELETED_NAME],
|
||||
folderIds[JUNK_NAME],
|
||||
folderIds[DRAFTS_NAME],
|
||||
folderIds[SENT_NAME],
|
||||
folderIds[ARCHIVE_NAME]);
|
||||
}
|
||||
|
||||
private async Task<Microsoft.Graph.Me.MailFolders.Delta.DeltaGetResponse> GetDeltaFoldersAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (string.IsNullOrEmpty(Account.SynchronizationDeltaIdentifier))
|
||||
{
|
||||
var deltaRequest = _graphClient.Me.MailFolders.Delta.ToGetRequestInformation();
|
||||
deltaRequest.UrlTemplate = deltaRequest.UrlTemplate.Insert(deltaRequest.UrlTemplate.Length - 1, ",includehiddenfolders");
|
||||
deltaRequest.QueryParameters.Add("includehiddenfolders", "true");
|
||||
|
||||
return await _graphClient.RequestAdapter.SendAsync(deltaRequest,
|
||||
Microsoft.Graph.Me.MailFolders.Delta.DeltaGetResponse.CreateFromDiscriminatorValue,
|
||||
cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var deltaRequest = _graphClient.Me.MailFolders.Delta.ToGetRequestInformation();
|
||||
deltaRequest.UrlTemplate = deltaRequest.UrlTemplate.Insert(deltaRequest.UrlTemplate.Length - 1, ",%24deltaToken");
|
||||
deltaRequest.QueryParameters.Add("%24deltaToken", Account.SynchronizationDeltaIdentifier);
|
||||
|
||||
return await _graphClient.RequestAdapter.SendAsync(deltaRequest,
|
||||
Microsoft.Graph.Me.MailFolders.Delta.DeltaGetResponse.CreateFromDiscriminatorValue,
|
||||
cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (ApiException apiException) when (apiException.ResponseStatusCode == 410)
|
||||
{
|
||||
Account.SynchronizationDeltaIdentifier = string.Empty;
|
||||
return await GetDeltaFoldersAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task UpdateDeltaSynchronizationIdentifierAsync(string deltalink)
|
||||
{
|
||||
if (string.IsNullOrEmpty(deltalink)) return;
|
||||
|
||||
var deltaToken = deltalink.Split('=')[1];
|
||||
var latestAccountDeltaToken = await _outlookChangeProcessor
|
||||
.UpdateAccountDeltaSynchronizationIdentifierAsync(Account.Id, deltaToken);
|
||||
|
||||
if (!string.IsNullOrEmpty(latestAccountDeltaToken))
|
||||
{
|
||||
Account.SynchronizationDeltaIdentifier = latestAccountDeltaToken;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -866,103 +894,117 @@ public class OutlookSynchronizer : WinoSynchronizer<RequestInformation, Message,
|
||||
|
||||
public override async Task ExecuteNativeRequestsAsync(List<IRequestBundle<RequestInformation>> batchedRequests, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var batchRequestInformations = batchedRequests.Batch((int)MaximumAllowedBatchRequestSize);
|
||||
var batchedGroups = batchedRequests.Batch((int)MaximumAllowedBatchRequestSize);
|
||||
|
||||
bool serializeRequests = false;
|
||||
|
||||
foreach (var batch in batchRequestInformations)
|
||||
foreach (var batch in batchedGroups)
|
||||
{
|
||||
var batchContent = new BatchRequestContentCollection(_graphClient);
|
||||
await ExecuteBatchRequestsAsync(batch, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
var itemCount = batch.Count();
|
||||
private async Task ExecuteBatchRequestsAsync(IEnumerable<IRequestBundle<RequestInformation>> batch, CancellationToken cancellationToken)
|
||||
{
|
||||
var batchContent = new BatchRequestContentCollection(_graphClient);
|
||||
var itemCount = batch.Count();
|
||||
|
||||
for (int i = 0; i < itemCount; i++)
|
||||
if (itemCount == 0) return;
|
||||
|
||||
var bundleIdMap = await PrepareBatchContentAsync(batch, batchContent, itemCount);
|
||||
|
||||
// Execute batch to collect responses from network call
|
||||
var batchRequestResponse = await _graphClient.Batch.PostAsync(batchContent, cancellationToken);
|
||||
|
||||
await ProcessBatchResponsesAsync(batch, batchRequestResponse, bundleIdMap);
|
||||
}
|
||||
|
||||
private async Task<Dictionary<string, IRequestBundle<RequestInformation>>> PrepareBatchContentAsync(
|
||||
IEnumerable<IRequestBundle<RequestInformation>> batch,
|
||||
BatchRequestContentCollection batchContent,
|
||||
int itemCount)
|
||||
{
|
||||
var bundleIdMap = new Dictionary<string, IRequestBundle<RequestInformation>>();
|
||||
bool requiresSerial = false;
|
||||
|
||||
for (int i = 0; i < itemCount; i++)
|
||||
{
|
||||
var bundle = batch.ElementAt(i);
|
||||
requiresSerial |= bundle.UIChangeRequest is SendDraftRequest;
|
||||
|
||||
bundle.UIChangeRequest?.ApplyUIChanges();
|
||||
var batchRequestId = await batchContent.AddBatchRequestStepAsync(bundle.NativeRequest);
|
||||
bundle.BundleId = batchRequestId;
|
||||
bundleIdMap[batchRequestId] = bundle;
|
||||
}
|
||||
|
||||
if (requiresSerial)
|
||||
{
|
||||
ConfigureSerialExecution(batchContent);
|
||||
}
|
||||
|
||||
return bundleIdMap;
|
||||
}
|
||||
|
||||
private void ConfigureSerialExecution(BatchRequestContentCollection batchContent)
|
||||
{
|
||||
// Set each step to depend on previous one for serial execution
|
||||
var steps = batchContent.BatchRequestSteps.ToList();
|
||||
for (int i = 1; i < steps.Count; i++)
|
||||
{
|
||||
var currentStep = steps[i].Value;
|
||||
var previousStepKey = steps[i - 1].Key;
|
||||
currentStep.DependsOn = [previousStepKey];
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessBatchResponsesAsync(
|
||||
IEnumerable<IRequestBundle<RequestInformation>> batch,
|
||||
BatchResponseContentCollection batchResponse,
|
||||
Dictionary<string, IRequestBundle<RequestInformation>> bundleIdMap)
|
||||
{
|
||||
var errors = new List<string>();
|
||||
|
||||
foreach (var bundleId in bundleIdMap.Keys)
|
||||
{
|
||||
var bundle = bundleIdMap[bundleId];
|
||||
var response = await batchResponse.GetResponseByIdAsync(bundleId);
|
||||
|
||||
if (response == null) continue;
|
||||
|
||||
using (response)
|
||||
{
|
||||
var bundle = batch.ElementAt(i);
|
||||
|
||||
if (bundle.UIChangeRequest is SendDraftRequest)
|
||||
if (!response.IsSuccessStatusCode)
|
||||
{
|
||||
// This bundle needs to run every request in serial.
|
||||
// By default requests are executed in parallel.
|
||||
|
||||
serializeRequests = true;
|
||||
await HandleFailedResponseAsync(bundle, response, errors);
|
||||
}
|
||||
|
||||
var nativeRequest = bundle.NativeRequest;
|
||||
|
||||
bundle.UIChangeRequest?.ApplyUIChanges();
|
||||
|
||||
var batchRequestId = await batchContent.AddBatchRequestStepAsync(nativeRequest).ConfigureAwait(false);
|
||||
|
||||
// Map BundleId to batch request step's key.
|
||||
// This is how we can identify which step succeeded or failed in the bundle.
|
||||
|
||||
bundle.BundleId = batchRequestId;
|
||||
}
|
||||
|
||||
if (!batchContent.BatchRequestSteps.Any())
|
||||
continue;
|
||||
|
||||
// Set execution type to serial instead of parallel if needed.
|
||||
// Each step will depend on the previous one.
|
||||
|
||||
if (serializeRequests)
|
||||
{
|
||||
for (int i = 1; i < itemCount; i++)
|
||||
{
|
||||
var currentStep = batchContent.BatchRequestSteps.ElementAt(i);
|
||||
var previousStep = batchContent.BatchRequestSteps.ElementAt(i - 1);
|
||||
|
||||
currentStep.Value.DependsOn = [previousStep.Key];
|
||||
}
|
||||
}
|
||||
|
||||
// Execute batch. This will collect responses from network call for each batch step.
|
||||
var batchRequestResponse = await _graphClient.Batch.PostAsync(batchContent, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Check responses for each bundle id.
|
||||
// Each bundle id must return some HttpResponseMessage ideally.
|
||||
|
||||
var bundleIds = batchContent.BatchRequestSteps.Select(a => a.Key);
|
||||
|
||||
var exceptionBag = new List<string>();
|
||||
|
||||
foreach (var bundleId in bundleIds)
|
||||
{
|
||||
var bundle = batch.FirstOrDefault(a => a.BundleId == bundleId);
|
||||
|
||||
if (bundle == null)
|
||||
continue;
|
||||
|
||||
var httpResponseMessage = await batchRequestResponse.GetResponseByIdAsync(bundleId);
|
||||
|
||||
if (httpResponseMessage == null)
|
||||
continue;
|
||||
|
||||
using (httpResponseMessage)
|
||||
{
|
||||
if (!httpResponseMessage.IsSuccessStatusCode)
|
||||
{
|
||||
bundle.UIChangeRequest?.RevertUIChanges();
|
||||
|
||||
var content = await httpResponseMessage.Content.ReadAsStringAsync();
|
||||
var errorJson = JsonNode.Parse(content);
|
||||
var errorString = $"[{httpResponseMessage.StatusCode}] {errorJson["error"]["code"]} - {errorJson["error"]["message"]}\n";
|
||||
|
||||
Debug.WriteLine(errorString);
|
||||
|
||||
exceptionBag.Add(errorString);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (exceptionBag.Any())
|
||||
{
|
||||
var formattedErrorString = string.Join("\n", exceptionBag.Select((item, index) => $"{index + 1}. {item}"));
|
||||
|
||||
throw new SynchronizerException(formattedErrorString);
|
||||
}
|
||||
}
|
||||
|
||||
if (errors.Any())
|
||||
{
|
||||
ThrowBatchExecutionException(errors);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleFailedResponseAsync(
|
||||
IRequestBundle<RequestInformation> bundle,
|
||||
HttpResponseMessage response,
|
||||
List<string> errors)
|
||||
{
|
||||
bundle.UIChangeRequest?.RevertUIChanges();
|
||||
|
||||
var content = await response.Content.ReadAsStringAsync();
|
||||
var errorJson = JsonNode.Parse(content);
|
||||
var errorString = $"[{response.StatusCode}] {errorJson["error"]["code"]} - {errorJson["error"]["message"]}\n";
|
||||
|
||||
Debug.WriteLine(errorString);
|
||||
errors.Add(errorString);
|
||||
}
|
||||
|
||||
private void ThrowBatchExecutionException(List<string> errors)
|
||||
{
|
||||
var formattedErrorString = string.Join("\n",
|
||||
errors.Select((item, index) => $"{index + 1}. {item}"));
|
||||
throw new SynchronizerException(formattedErrorString);
|
||||
}
|
||||
|
||||
public override async Task<List<MailCopy>> OnlineSearchAsync(string queryText, List<IMailItemFolder> folders, CancellationToken cancellationToken = default)
|
||||
|
||||
Reference in New Issue
Block a user