Skip to content

Commit 50fc7df

Browse files
authored
Propagate DocumentClientException from ProcessChangesAsync (#127)
Fix DocumentClientExceptions happening in user code and affecting Processor logic
1 parent f78c72e commit 50fc7df

File tree

12 files changed

+400
-11
lines changed

12 files changed

+400
-11
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
//----------------------------------------------------------------
2+
// Copyright (c) Microsoft Corporation. Licensed under the MIT license.
3+
//----------------------------------------------------------------
4+
5+
using System;
6+
using System.IO;
7+
using System.Runtime.Serialization.Formatters.Binary;
8+
using Microsoft.Azure.Documents.ChangeFeedProcessor.Exceptions;
9+
using Xunit;
10+
11+
namespace Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.Exceptions
12+
{
13+
[Trait("Category", "Gated")]
14+
public class ObserverExceptionTests
15+
{
16+
[Fact]
17+
public void ValidateConstructor()
18+
{
19+
Exception exception = new Exception("randomMessage");
20+
var ex = new ObserverException(exception);
21+
Assert.Equal(exception.Message, ex.InnerException.Message);
22+
Assert.Equal(exception, ex.InnerException);
23+
}
24+
25+
// Tests the GetObjectData method and the serialization ctor.
26+
[Fact]
27+
public void ValidateSerialization_AllFields()
28+
{
29+
Exception exception = new Exception("randomMessage");
30+
var originalException = new ObserverException(exception);
31+
var buffer = new byte[4096];
32+
var formatter = new BinaryFormatter();
33+
var stream1 = new MemoryStream(buffer);
34+
var stream2 = new MemoryStream(buffer);
35+
36+
formatter.Serialize(stream1, originalException);
37+
var deserializedException = (ObserverException)formatter.Deserialize(stream2);
38+
39+
Assert.Equal(originalException.Message, deserializedException.Message);
40+
Assert.Equal(originalException.InnerException.Message, deserializedException.InnerException.Message);
41+
}
42+
43+
// Make sure that when some fields are not set, serialization is not broken.
44+
[Fact]
45+
public void ValidateSerialization_NullFields()
46+
{
47+
var originalException = new ObserverException(null);
48+
var buffer = new byte[4096];
49+
var formatter = new BinaryFormatter();
50+
var stream1 = new MemoryStream(buffer);
51+
var stream2 = new MemoryStream(buffer);
52+
53+
formatter.Serialize(stream1, originalException);
54+
var deserializedException = (ObserverException)formatter.Deserialize(stream2);
55+
56+
Assert.Equal(originalException.Message, deserializedException.Message);
57+
Assert.Null(deserializedException.InnerException);
58+
}
59+
}
60+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
//----------------------------------------------------------------
2+
// Copyright (c) Microsoft Corporation. Licensed under the MIT license.
3+
//----------------------------------------------------------------
4+
5+
namespace Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.FeedProcessor
6+
{
7+
using System;
8+
using System.Collections.Generic;
9+
using System.Linq;
10+
using System.Threading;
11+
using System.Threading.Tasks;
12+
using Microsoft.Azure.Documents.ChangeFeedProcessor.Exceptions;
13+
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
14+
using Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.Utils;
15+
using Moq;
16+
using Xunit;
17+
18+
[Trait("Category", "Gated")]
19+
public class ObserverExceptionWrappingChangeFeedObserverDecoratorTests
20+
{
21+
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
22+
private readonly IChangeFeedObserver observer;
23+
private readonly IChangeFeedObserverContext changeFeedObserverContext;
24+
private readonly FeedProcessing.ObserverExceptionWrappingChangeFeedObserverDecorator observerWrapper;
25+
private readonly List<Document> documents;
26+
27+
public ObserverExceptionWrappingChangeFeedObserverDecoratorTests()
28+
{
29+
this.observer = Mock.Of<IChangeFeedObserver>();
30+
this.changeFeedObserverContext = Mock.Of<IChangeFeedObserverContext>();
31+
this.observerWrapper = new FeedProcessing.ObserverExceptionWrappingChangeFeedObserverDecorator(observer);
32+
33+
var document = new Document();
34+
documents = new List<Document> { document };
35+
}
36+
37+
[Fact]
38+
public async Task OpenAsync_ShouldCallOpenAsync()
39+
{
40+
await observerWrapper.OpenAsync(this.changeFeedObserverContext);
41+
42+
Mock.Get(observer)
43+
.Verify(feedObserver => feedObserver
44+
.OpenAsync(It.IsAny<IChangeFeedObserverContext>()),
45+
Times.Once);
46+
}
47+
48+
[Fact]
49+
public async Task CloseAsync_ShouldCallCloseAsync()
50+
{
51+
await observerWrapper.CloseAsync(this.changeFeedObserverContext, ChangeFeedObserverCloseReason.Shutdown);
52+
53+
Mock.Get(observer)
54+
.Verify(feedObserver => feedObserver
55+
.CloseAsync(It.IsAny<IChangeFeedObserverContext>(),
56+
It.Is<ChangeFeedObserverCloseReason>(reason => reason == ChangeFeedObserverCloseReason.Shutdown)),
57+
Times.Once);
58+
}
59+
60+
[Fact]
61+
public async Task ProcessChangesAsync_ShouldPassDocumentsToProcessChangesAsync()
62+
{
63+
await observerWrapper.ProcessChangesAsync(this.changeFeedObserverContext, this.documents, cancellationTokenSource.Token);
64+
65+
Mock.Get(observer)
66+
.Verify(feedObserver => feedObserver
67+
.ProcessChangesAsync(It.IsAny<IChangeFeedObserverContext>(),
68+
It.Is<IReadOnlyList<Document>>(list => list.SequenceEqual(documents)),
69+
It.IsAny<CancellationToken>()
70+
),
71+
Times.Once);
72+
}
73+
74+
[Fact]
75+
public async Task ProcessChangesAsync_ShouldThrow_IfObserverThrows()
76+
{
77+
Mock.Get(observer)
78+
.SetupSequence(feedObserver => feedObserver
79+
.ProcessChangesAsync(It.IsAny<IChangeFeedObserverContext>(), It.IsAny<IReadOnlyList<Document>>(), It.IsAny<CancellationToken>()))
80+
.Throws(new Exception());
81+
82+
Exception exception = await Record.ExceptionAsync(() => observerWrapper.ProcessChangesAsync(this.changeFeedObserverContext, this.documents, cancellationTokenSource.Token));
83+
Assert.IsAssignableFrom<ObserverException>(exception);
84+
Assert.IsAssignableFrom<Exception>(exception.InnerException);
85+
86+
Mock.Get(observer)
87+
.Verify(feedObserver => feedObserver
88+
.ProcessChangesAsync(It.IsAny<IChangeFeedObserverContext>(),
89+
It.Is<IReadOnlyList<Document>>(list => list.SequenceEqual(documents)),
90+
It.IsAny<CancellationToken>()
91+
),
92+
Times.Once);
93+
}
94+
95+
[Fact]
96+
public async Task ProcessChangesAsync_ShouldThrow_IfObserverThrowsDocumentClientException()
97+
{
98+
Mock.Get(observer)
99+
.SetupSequence(feedObserver => feedObserver
100+
.ProcessChangesAsync(It.IsAny<IChangeFeedObserverContext>(), It.IsAny<IReadOnlyList<Document>>(), It.IsAny<CancellationToken>()))
101+
.Throws(DocumentExceptionHelpers.CreateRequestRateTooLargeException());
102+
103+
Exception exception = await Record.ExceptionAsync(() => observerWrapper.ProcessChangesAsync(this.changeFeedObserverContext, this.documents, cancellationTokenSource.Token));
104+
Assert.IsAssignableFrom<ObserverException>(exception);
105+
Assert.IsAssignableFrom<DocumentClientException>(exception.InnerException);
106+
107+
Mock.Get(observer)
108+
.Verify(feedObserver => feedObserver
109+
.ProcessChangesAsync(It.IsAny<IChangeFeedObserverContext>(),
110+
It.Is<IReadOnlyList<Document>>(list => list.SequenceEqual(documents)),
111+
It.IsAny<CancellationToken>()
112+
),
113+
Times.Once);
114+
}
115+
}
116+
}

src/DocumentDB.ChangeFeedProcessor.UnitTests/FeedProcessor/PartitionProcessorTests.cs

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ namespace Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.FeedProcessor
1010
using System.Threading;
1111
using System.Threading.Tasks;
1212
using Microsoft.Azure.Documents.ChangeFeedProcessor.DataAccess;
13+
using Microsoft.Azure.Documents.ChangeFeedProcessor.Exceptions;
1314
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
1415
using Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.Utils;
1516
using Microsoft.Azure.Documents.Client;
@@ -70,7 +71,7 @@ public PartitionProcessorTests()
7071

7172
observer = Mock.Of<IChangeFeedObserver>();
7273
var checkPointer = new Mock<IPartitionCheckpointer>();
73-
sut = new PartitionProcessor(observer, docClient, processorSettings, checkPointer.Object);
74+
sut = new PartitionProcessor(new FeedProcessing.ObserverExceptionWrappingChangeFeedObserverDecorator(observer), docClient, processorSettings, checkPointer.Object);
7475
}
7576

7677
[Fact]
@@ -175,6 +176,72 @@ public async Task Run_ShouldRetryQuery_IfDocDBThrowsCanceled()
175176
Times.Once);
176177
}
177178

179+
[Fact]
180+
public async Task Run_ShouldThrow_IfObserverThrows()
181+
{
182+
Mock.Get(documentQuery)
183+
.Reset();
184+
185+
Mock.Get(documentQuery)
186+
.SetupSequence(query => query.ExecuteNextAsync<Document>(It.Is<CancellationToken>(token => token == cancellationTokenSource.Token)))
187+
.ReturnsAsync(feedResponse);
188+
189+
Mock.Get(observer)
190+
.SetupSequence(feedObserver => feedObserver
191+
.ProcessChangesAsync(It.IsAny<IChangeFeedObserverContext>(), It.IsAny<IReadOnlyList<Document>>(), It.IsAny<CancellationToken>()))
192+
.Throws(new CustomException())
193+
.Returns(Task.CompletedTask);
194+
195+
Exception exception = await Record.ExceptionAsync(() => sut.RunAsync(cancellationTokenSource.Token));
196+
Assert.IsAssignableFrom<ObserverException>(exception);
197+
Assert.IsAssignableFrom<CustomException>(exception.InnerException);
198+
199+
Mock.Get(documentQuery)
200+
.Verify(query => query.ExecuteNextAsync<Document>(It.Is<CancellationToken>(token => token == cancellationTokenSource.Token)), Times.Once);
201+
202+
Mock.Get(observer)
203+
.Verify(feedObserver => feedObserver
204+
.ProcessChangesAsync(
205+
It.Is<IChangeFeedObserverContext>(context => context.PartitionKeyRangeId == processorSettings.PartitionKeyRangeId),
206+
It.Is<IReadOnlyList<Document>>(list => list.SequenceEqual(documents)),
207+
It.IsAny<CancellationToken>()),
208+
Times.Once);
209+
}
210+
211+
[Fact]
212+
public async Task Run_ShouldThrow_IfObserverThrowsDocumentClientException()
213+
{
214+
// If the user code throws a DCE, we should bubble it up to stop the Observer and not treat it as a DCE from the Feed Query
215+
216+
Mock.Get(documentQuery)
217+
.Reset();
218+
219+
Mock.Get(documentQuery)
220+
.Setup(query => query.ExecuteNextAsync<Document>(It.Is<CancellationToken>(token => token == cancellationTokenSource.Token)))
221+
.ReturnsAsync(feedResponse)
222+
.Callback(() => cancellationTokenSource.Cancel());
223+
224+
Mock.Get(observer)
225+
.Setup(feedObserver => feedObserver
226+
.ProcessChangesAsync(It.IsAny<IChangeFeedObserverContext>(), It.IsAny<IReadOnlyList<Document>>(), It.IsAny<CancellationToken>()))
227+
.Throws(DocumentExceptionHelpers.CreateRequestRateTooLargeException());
228+
229+
Exception exception = await Record.ExceptionAsync(() => sut.RunAsync(cancellationTokenSource.Token));
230+
Assert.IsAssignableFrom<ObserverException>(exception);
231+
Assert.IsAssignableFrom<DocumentClientException>(exception.InnerException);
232+
233+
Mock.Get(documentQuery)
234+
.Verify(query => query.ExecuteNextAsync<Document>(It.Is<CancellationToken>(token => token == cancellationTokenSource.Token)), Times.Once);
235+
236+
Mock.Get(observer)
237+
.Verify(feedObserver => feedObserver
238+
.ProcessChangesAsync(
239+
It.Is<IChangeFeedObserverContext>(context => context.PartitionKeyRangeId == processorSettings.PartitionKeyRangeId),
240+
It.Is<IReadOnlyList<Document>>(list => list.SequenceEqual(documents)),
241+
It.IsAny<CancellationToken>()),
242+
Times.Once);
243+
}
244+
178245
/// <summary>
179246
/// (1) Read normal feed
180247
/// (2) Get 400 with
@@ -250,5 +317,9 @@ public async Task Run_ShouldDecreaseMaxItemCountWhenNeeded()
250317

251318
Assert.Equal("token.token2.token3.", accumulator);
252319
}
320+
321+
private class CustomException : Exception
322+
{
323+
}
253324
}
254325
}

src/DocumentDB.ChangeFeedProcessor.UnitTests/PartitionManagement/PartitionSupervisorTests.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,21 @@ public async Task RunObserver_ShouldCancelRenewer_IfProcessorFailed()
110110
Assert.Equal("processorException", exception.Message);
111111
Assert.True(renewerTask.IsCanceled);
112112

113+
Mock.Get(observer)
114+
.Verify(feedObserver => feedObserver
115+
.CloseAsync(It.Is<ChangeFeedObserverContext>(context => context.PartitionKeyRangeId == lease.PartitionId),
116+
ChangeFeedObserverCloseReason.Unknown));
117+
}
118+
119+
[Fact]
120+
public async Task RunObserver_ShouldCloseWithObserverError_IfObserverFailed()
121+
{
122+
Mock.Get(partitionProcessor)
123+
.Setup(processor => processor.RunAsync(It.IsAny<CancellationToken>()))
124+
.ThrowsAsync(new ObserverException(new Exception()));
125+
126+
Exception exception = await Record.ExceptionAsync(() => sut.RunAsync(shutdownToken.Token)).ConfigureAwait(false);
127+
113128
Mock.Get(observer)
114129
.Verify(feedObserver => feedObserver
115130
.CloseAsync(It.Is<ChangeFeedObserverContext>(context => context.PartitionKeyRangeId == lease.PartitionId),

src/DocumentDB.ChangeFeedProcessor.UnitTests/Utils/DocumentExceptionHelpers.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
namespace Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.Utils
1111
{
12-
public static class DocumentExceptionHelpers
12+
internal static class DocumentExceptionHelpers
1313
{
1414
public static Exception CreateNotFoundException()
1515
{
@@ -26,17 +26,21 @@ public static Exception CreateConflictException()
2626

2727
public static Exception CreateRequestRateTooLargeException()
2828
{
29-
return CreateException("Microsoft.Azure.Documents.RequestRateTooLargeException", 1);
29+
return CreateException("Microsoft.Azure.Documents.RequestRateTooLargeException", 1, "Request throttled", 100);
3030
}
3131

32-
public static Exception CreateException(string exceptionType, int subStatusCode, string message = "")
32+
public static Exception CreateException(string exceptionType, int subStatusCode, string message = "", int retryAfter = 0)
3333
{
3434
Type t = typeof(DocumentClientException)
3535
.Assembly.GetType(exceptionType);
3636

3737
HttpResponseHeaders httpResponseHeaders = CreateResponseHeaders();
3838
httpResponseHeaders.TryAddWithoutValidation("x-ms-substatus", subStatusCode.ToString());
3939
httpResponseHeaders.TryAddWithoutValidation("x-ms-activity-id", "activityId");
40+
if (retryAfter > 0)
41+
{
42+
httpResponseHeaders.TryAddWithoutValidation("x-ms-retry-after-ms", retryAfter.ToString());
43+
}
4044

4145
object ex = t.GetConstructor(
4246
BindingFlags.Public | BindingFlags.Instance,

src/DocumentDB.ChangeFeedProcessor/DocumentDB.ChangeFeedProcessor.csproj

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<AssemblyName>Microsoft.Azure.Documents.ChangeFeedProcessor</AssemblyName>
2222

2323
<PackageId>Microsoft.Azure.DocumentDB.ChangeFeedProcessor</PackageId>
24-
<PackageVersion>2.2.5</PackageVersion>
24+
<PackageVersion>2.2.6</PackageVersion>
2525
<Title>Microsoft Azure Cosmos DB Change Feed Processor library</Title>
2626
<Authors>Microsoft</Authors>
2727
<PackageLicenseUrl>http://go.microsoft.com/fwlink/?LinkID=509837</PackageLicenseUrl>
@@ -44,9 +44,9 @@
4444
<!--CS1587:XML comment is not placed on a valid language element
4545
LibLog files have misplaced comments, but we cannot touch them.-->
4646
<NoWarn>1587</NoWarn>
47-
<Version>2.2.5</Version>
48-
<AssemblyVersion>2.2.5.0</AssemblyVersion>
49-
<FileVersion>2.2.5.0</FileVersion>
47+
<Version>2.2.6</Version>
48+
<AssemblyVersion>2.2.6.0</AssemblyVersion>
49+
<FileVersion>2.2.6.0</FileVersion>
5050
<PackageReleaseNotes>The change log for this project is available at https://docs.microsoft.com/azure/cosmos-db/sql-api-sdk-dotnet-changefeed.
5151
</PackageReleaseNotes>
5252
</PropertyGroup>

0 commit comments

Comments
 (0)