diff --git a/Cognite.Testing/Mock/Events.cs b/Cognite.Testing/Mock/Events.cs
new file mode 100644
index 00000000..fad933e1
--- /dev/null
+++ b/Cognite.Testing/Mock/Events.cs
@@ -0,0 +1,253 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Cognite.Extractor.Common;
+using CogniteSdk;
+using Moq;
+using Xunit;
+
+namespace Cognite.Extractor.Testing.Mock
+{
+ ///
+ /// Mock implementation of the Events API.
+ ///
+ public class EventsMock
+ {
+ private long _nextId = 10000;
+ private readonly Dictionary _eventsByExternalId = new Dictionary();
+ private readonly Dictionary _eventsById = new Dictionary();
+
+ ///
+ /// All mocked events.
+ ///
+ public ICollection Events => _eventsById.Values;
+
+
+ ///
+ /// Mock an event, assigning it an ID.
+ ///
+ /// Event to add.
+ public void MockEvent(Event ev)
+ {
+ if (ev == null) throw new ArgumentNullException(nameof(ev));
+ ev.Id = _nextId++;
+ _eventsById[ev.Id] = ev;
+ if (!string.IsNullOrEmpty(ev.ExternalId))
+ {
+ _eventsByExternalId[ev.ExternalId] = ev;
+ }
+ }
+
+ ///
+ /// Remove a mocked event by its external ID, if it is present.
+ ///
+ /// External ID of event to remove.
+ public bool Remove(string externalId)
+ {
+ if (_eventsByExternalId.TryGetValue(externalId, out var ev))
+ {
+ _eventsByExternalId.Remove(externalId);
+ _eventsById.Remove(ev.Id);
+ return true;
+ }
+ return false;
+ }
+
+ ///
+ /// Mock an event with the given external ID, assigning it an ID.
+ ///
+ /// External ID of the event to add.
+ public void MockEvent(string externalId)
+ {
+ MockEvent(new Event
+ {
+ ExternalId = externalId,
+ Type = "someType",
+ StartTime = DateTime.UtcNow.ToUnixTimeMilliseconds(),
+ EndTime = DateTime.UtcNow.ToUnixTimeMilliseconds(),
+ CreatedTime = DateTime.UtcNow.ToUnixTimeMilliseconds(),
+ LastUpdatedTime = DateTime.UtcNow.ToUnixTimeMilliseconds(),
+ });
+ }
+
+ ///
+ /// Get an event by its identity, if it exists.
+ ///
+ /// Event ID
+ /// The event, if it exists.
+ public Event? GetEvent(Identity id)
+ {
+ if (id == null) throw new ArgumentNullException(nameof(id));
+
+ if (id.Id.HasValue && _eventsById.TryGetValue(id.Id.Value, out var ev))
+ {
+ return ev;
+ }
+ else if (!string.IsNullOrEmpty(id.ExternalId) && _eventsByExternalId.TryGetValue(id.ExternalId, out var ev2))
+ {
+ return ev2;
+ }
+
+ return null;
+ }
+ ///
+ /// Get an event by its external ID, if it exists.
+ ///
+ /// Event external ID
+ /// The event, if it exists.
+
+ public Event? GetEvent(string externalId)
+ {
+ if (externalId == null) throw new ArgumentNullException(nameof(externalId));
+
+ if (_eventsByExternalId.TryGetValue(externalId, out var ev))
+ {
+ return ev;
+ }
+
+ return null;
+ }
+
+ ///
+ /// Get an event by its internal ID, if it exists.
+ ///
+ /// Event ID
+ /// The event, if it exists.
+ public Event? GetEvent(long id)
+ {
+ if (_eventsById.TryGetValue(id, out var ev))
+ {
+ return ev;
+ }
+
+ return null;
+ }
+
+
+ ///
+ /// Clear the events mock, removing all mocked events.
+ ///
+ public void Clear()
+ {
+ _nextId = 10000;
+ _eventsByExternalId.Clear();
+ _eventsById.Clear();
+ }
+
+ ///
+ /// Get a matcher for the /events/byids endpoint.
+ ///
+ /// Expected number of executions.
+ public RequestMatcher MakeGetByIdsMatcher(Times times)
+ {
+ return new SimpleMatcher("POST", "/events/byids", EventsByIdsImpl, times);
+ }
+
+ ///
+ /// Get a matcher for the /events endpoint for creating events.
+ ///
+ /// Expected number of executions.
+ public RequestMatcher MakeCreateEventsMatcher(Times times)
+ {
+ return new SimpleMatcher("POST", "/events$", EventsCreateImpl, times);
+ }
+
+ private async Task EventsCreateImpl(RequestContext context, CancellationToken token)
+ {
+ var events = await context.ReadJsonBody>().ConfigureAwait(false);
+ Assert.NotNull(events);
+ var created = new List();
+ var conflict = new List();
+
+ foreach (var ev in events.Items)
+ {
+ if (ev.ExternalId != null && _eventsByExternalId.ContainsKey(ev.ExternalId))
+ {
+ conflict.Add(ev.ExternalId);
+ continue;
+ }
+
+ var newEvent = new Event
+ {
+ Id = _nextId++,
+ ExternalId = ev.ExternalId,
+ Type = ev.Type,
+ StartTime = ev.StartTime,
+ EndTime = ev.EndTime,
+ Source = ev.Source,
+ Description = ev.Description,
+ CreatedTime = DateTime.UtcNow.ToUnixTimeMilliseconds(),
+ LastUpdatedTime = DateTime.UtcNow.ToUnixTimeMilliseconds(),
+ Metadata = ev.Metadata,
+ };
+
+ _eventsById[newEvent.Id] = newEvent;
+ if (newEvent.ExternalId != null)
+ {
+ _eventsByExternalId[newEvent.ExternalId] = newEvent;
+ }
+ created.Add(newEvent);
+ }
+
+ if (conflict.Count > 0)
+ {
+ return context.CreateError(new CogniteError
+ {
+ Code = 409,
+ Message = "Conflict",
+ Duplicated = conflict.Distinct().Select(id => MockUtils.ToMultiValueDict(new Identity(id))).ToList()
+ });
+ }
+
+ return context.CreateJsonResponse(new ItemsWithoutCursor { Items = created });
+ }
+
+ private async Task EventsByIdsImpl(RequestContext context, CancellationToken token)
+ {
+ var ids = await context.ReadJsonBody>().ConfigureAwait(false);
+ Assert.NotNull(ids);
+ var found = new List();
+ var missing = new List();
+
+ foreach (var id in ids.Items)
+ {
+ Event? ev = null;
+ if (id.Id.HasValue)
+ {
+ _eventsById.TryGetValue(id.Id.Value, out ev);
+ }
+ else if (!string.IsNullOrEmpty(id.ExternalId))
+ {
+ _eventsByExternalId.TryGetValue(id.ExternalId, out ev);
+ }
+
+ if (ev != null)
+ {
+ found.Add(ev);
+ }
+ else
+ {
+ missing.Add(id.ToIdentity());
+ }
+ }
+
+ if (!ids.IgnoreUnknownIds && missing.Count > 0)
+ {
+ return context.CreateError(new CogniteError
+ {
+ Code = 400,
+ Message = "Events not found",
+ Missing = missing.Distinct().Select(MockUtils.ToMultiValueDict).ToList(),
+ });
+ }
+
+ return context.CreateJsonResponse(new ItemsWithoutCursor
+ {
+ Items = found
+ });
+ }
+ }
+}
\ No newline at end of file
diff --git a/Cognite.Testing/Mock/MockUtils.cs b/Cognite.Testing/Mock/MockUtils.cs
new file mode 100644
index 00000000..7bf25bd2
--- /dev/null
+++ b/Cognite.Testing/Mock/MockUtils.cs
@@ -0,0 +1,38 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using CogniteSdk;
+
+namespace Cognite.Extractor.Testing.Mock
+{
+ ///
+ /// Common utilities for mock implementations of CDF APIs.
+ ///
+ public class MockUtils
+ {
+ ///
+ /// Convert an Identity to a dictionary used in error responses.
+ ///
+ /// Identity to convert
+ /// Multivalue dictionary
+ public static Dictionary ToMultiValueDict(Identity id)
+ {
+ if (id == null) throw new ArgumentNullException(nameof(id));
+ var dict = new Dictionary();
+ if (id.Id.HasValue)
+ {
+ dict["id"] = MultiValue.Create(id.Id.Value);
+ }
+ else if (id.InstanceId != null)
+ {
+ dict["instanceId"] = MultiValue.Create(id.InstanceId);
+ }
+ else if (!string.IsNullOrEmpty(id.ExternalId))
+ {
+ dict["externalId"] = MultiValue.Create(id.ExternalId);
+ }
+ return dict;
+ }
+ }
+}
\ No newline at end of file
diff --git a/Cognite.Testing/Mock/TimeSeries.cs b/Cognite.Testing/Mock/TimeSeries.cs
index e4a9ae85..481b1d97 100644
--- a/Cognite.Testing/Mock/TimeSeries.cs
+++ b/Cognite.Testing/Mock/TimeSeries.cs
@@ -197,24 +197,6 @@ public RequestMatcher MakeCreateDatapointsMatcher(Times times)
return new SimpleMatcher("POST", "/timeseries/data", CreateDatapointsImpl, times);
}
- private static Dictionary ToMultiValueDict(Identity id)
- {
- var dict = new Dictionary();
- if (id.Id.HasValue)
- {
- dict["id"] = MultiValue.Create(id.Id.Value);
- }
- else if (id.InstanceId != null)
- {
- dict["instanceId"] = MultiValue.Create(id.InstanceId);
- }
- else if (!string.IsNullOrEmpty(id.ExternalId))
- {
- dict["externalId"] = MultiValue.Create(id.ExternalId);
- }
- return dict;
- }
-
private async Task TimeSeriesByIdsImpl(RequestContext context, CancellationToken token)
{
var ids = await context.ReadJsonBody>().ConfigureAwait(false);
@@ -240,7 +222,7 @@ private async Task TimeSeriesByIdsImpl(RequestContext conte
{
Code = 400,
Message = "Timeseries not found",
- Missing = missing.Distinct().Select(ToMultiValueDict).ToList(),
+ Missing = missing.Distinct().Select(MockUtils.ToMultiValueDict).ToList(),
});
}
return context.CreateJsonResponse(new ItemsWithoutCursor
@@ -302,7 +284,7 @@ private async Task CreateDatapointsImpl(RequestContext cont
{
Code = 400,
Message = "Timeseries not found",
- Missing = missing.Distinct().Select(ToMultiValueDict).ToList(),
+ Missing = missing.Distinct().Select(MockUtils.ToMultiValueDict).ToList(),
});
}
if (mismatchedExpected != null)
diff --git a/ExtractorUtils.Test/unit/EventTest.cs b/ExtractorUtils.Test/unit/EventTest.cs
index 62353894..a83c29c3 100644
--- a/ExtractorUtils.Test/unit/EventTest.cs
+++ b/ExtractorUtils.Test/unit/EventTest.cs
@@ -19,6 +19,8 @@
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
+using Cognite.Extractor.Testing.Mock;
+using Moq;
namespace ExtractorUtils.Test.Unit
{
@@ -27,8 +29,6 @@ public class EventTest
private const string _project = "someProject";
private const string _host = "https://test.cognitedata.com";
- private bool _failInsert;
-
private readonly ITestOutputHelper _output;
public EventTest(ITestOutputHelper output)
{
@@ -44,17 +44,9 @@ public EventTest(ITestOutputHelper output)
[InlineData("id11", "id12", "duplicated3", "id13", "duplicated4")]
[InlineData("id14", "missing5", "id15", "duplicated5", "missing6", "duplicated6")]
[InlineData("id16", "id17", "missing7", "duplicated7-2", "duplicated8-4", "duplicated9-3")]
- ///
- /// External ids starting with 'id' exist in the mocked endpoint.
- /// External ids starting with 'missing' do not exist, but can be successfully created.
- /// External ids starting with 'duplicated' do not exist, and fail during creation as duplicated.
- /// Duplicated with a suffix '-N', where N is an int will be reported by the endpoint as duplicated
- /// a total of N times.
- ///
- ///
- ///
public async Task TestEnsureEvents(params string[] ids)
{
+ if (ids == null) throw new ArgumentNullException(nameof(ids));
string path = "test-ensure-events-config.yml";
string[] lines = { "version: 2",
"logger:",
@@ -69,20 +61,28 @@ public async Task TestEnsureEvents(params string[] ids)
" events: 2" };
System.IO.File.WriteAllLines(path, lines);
- var mocks = TestUtilities.GetMockedHttpClientFactory(mockEnsureEventsSendAsync);
- var mockHttpMessageHandler = mocks.handler;
- var mockFactory = mocks.factory;
-
// Setup services
var services = new ServiceCollection();
- services.AddSingleton(mockFactory.Object); // inject the mock factory
services.AddConfig(path, 2);
services.AddTestLogging(_output);
+ CdfMock.RegisterHttpClient(services);
services.AddCogniteClient("testApp");
using (var provider = services.BuildServiceProvider())
{
var cogniteDestination = provider.GetRequiredService();
+ var mock = provider.GetRequiredService();
+ var events = new EventsMock();
+
+ foreach (var id in ids)
+ {
+ if (id.StartsWith("duplicated"))
+ {
+ events.MockEvent(id);
+ }
+ }
+ mock.AddMatcher(events.MakeCreateEventsMatcher(Times.AtLeast(1)));
+ mock.AddMatcher(events.MakeGetByIdsMatcher(Times.AtLeast(1)));
Func, IEnumerable> createFunction =
(idxs) =>
{
@@ -103,10 +103,16 @@ public async Task TestEnsureEvents(params string[] ids)
SanitationMode.Remove,
CancellationToken.None
);
- Assert.Equal(ids?.Length, ts.Results.Where(t => ids.Contains(t.ExternalId)).Count());
- foreach (var t in ts.Results)
+ Assert.Equal(ids.Length, ts.Results.Count());
+ Assert.Equal(events.Events.Count, ids.Length);
+ events.Clear();
+
+ foreach (var id in ids)
{
- _ensuredEvents.Remove(t.ExternalId, out _);
+ if (id.StartsWith("duplicated"))
+ {
+ events.MockEvent(id);
+ }
}
var newEvents = createFunction(ids);
@@ -115,8 +121,7 @@ public async Task TestEnsureEvents(params string[] ids)
// a timeout would fail the test
await cogniteDestination.EnsureEventsExistsAsync(newEvents, RetryMode.OnFatal, SanitationMode.Remove, source.Token);
}
- Assert.Equal(ids.Length, _ensuredEvents
- .Where(kvp => ids.Contains(kvp.Key)).Count());
+ Assert.Equal(ids.Length, events.Events.Count);
}
System.IO.File.Delete(path);
@@ -139,14 +144,10 @@ public async Task TestUploadQueue()
" events: 2" };
System.IO.File.WriteAllLines(path, lines);
- var mocks = TestUtilities.GetMockedHttpClientFactory(mockEnsureEventsSendAsync);
- var mockHttpMessageHandler = mocks.handler;
- var mockFactory = mocks.factory;
-
var services = new ServiceCollection();
- services.AddSingleton(mockFactory.Object); // inject the mock factory
services.AddConfig(path, 2);
services.AddTestLogging(_output);
+ CdfMock.RegisterHttpClient(services);
services.AddCogniteClient("testApp", setLogger: true, setMetrics: false);
var index = 0;
@@ -158,6 +159,9 @@ public async Task TestUploadQueue()
{
var cogniteDestination = provider.GetRequiredService();
var logger = provider.GetRequiredService>();
+ var mock = provider.GetRequiredService();
+ var events = new EventsMock();
+ mock.AddMatcher(events.MakeCreateEventsMatcher(Times.Between(7, 13, Moq.Range.Inclusive)));
// queue with 1 sec upload interval
await using (var queue = cogniteDestination.CreateEventUploadQueue(TimeSpan.FromSeconds(1), 0, res =>
{
@@ -191,6 +195,8 @@ public async Task TestUploadQueue()
Assert.Equal(13, evtCount);
Assert.True(cbCount <= 3);
cbCount = 0;
+ mock.AssertAndClear();
+ mock.AddMatcher(events.MakeCreateEventsMatcher(Times.Exactly(6)));
// queue with maximum size
await using (var queue = cogniteDestination.CreateEventUploadQueue(TimeSpan.FromMinutes(10), 5, res =>
@@ -247,17 +253,15 @@ public async Task TestUploadQueueBuffer()
" cdf-chunking:",
" events: 2",
" cdf-throttling:",
- " events: 2" };
+ " events: 2",
+ " cdf-retries:",
+ " max-retries: 0" };
System.IO.File.WriteAllLines(path, lines);
- var mocks = TestUtilities.GetMockedHttpClientFactory(mockEnsureEventsSendAsync);
- var mockHttpMessageHandler = mocks.handler;
- var mockFactory = mocks.factory;
-
var services = new ServiceCollection();
- services.AddSingleton(mockFactory.Object); // inject the mock factory
services.AddConfig(path, 2);
services.AddTestLogging(_output);
+ CdfMock.RegisterHttpClient(services);
services.AddCogniteClient("testApp", setLogger: true, setMetrics: false);
System.IO.File.Create("event-buffer.bin").Close();
@@ -267,6 +271,11 @@ public async Task TestUploadQueueBuffer()
{
var cogniteDestination = provider.GetRequiredService();
var logger = provider.GetRequiredService>();
+ var mock = provider.GetRequiredService();
+ var events = new EventsMock();
+ // Just the successful inserts, 5 batches of 2
+ mock.AddMatcher(events.MakeCreateEventsMatcher(Times.Exactly(5)));
+ mock.AddTokenInspectEndpoint(Times.AtLeastOnce(), _project);
await using (var queue = cogniteDestination.CreateEventUploadQueue(TimeSpan.Zero, 0, null, "event-buffer.bin"))
{
var _ = queue.Start(source.Token);
@@ -279,18 +288,18 @@ public async Task TestUploadQueueBuffer()
EndTime = DateTime.UtcNow.ToUnixTimeMilliseconds()
});
}
- _failInsert = true;
+ mock.RejectAllMessages = true;
Assert.Equal(0, new FileInfo("event-buffer.bin").Length);
await queue.Trigger(CancellationToken.None);
Assert.True(new FileInfo("event-buffer.bin").Length > 0);
- Assert.Empty(_ensuredEvents);
+ Assert.Empty(events.Events);
await queue.Trigger(CancellationToken.None);
Assert.True(new FileInfo("event-buffer.bin").Length > 0);
- Assert.Empty(_ensuredEvents);
- _failInsert = false;
+ Assert.Empty(events.Events);
+ mock.RejectAllMessages = false;
await queue.Trigger(CancellationToken.None);
Assert.Equal(0, new FileInfo("event-buffer.bin").Length);
- Assert.Equal(10, _ensuredEvents.Count);
+ Assert.Equal(10, events.Events.Count);
logger.LogInformation("Disposing of the upload queue");
}
logger.LogInformation("Upload queue disposed");
@@ -298,139 +307,5 @@ public async Task TestUploadQueueBuffer()
System.IO.File.Delete("event-buffer.bin");
System.IO.File.Delete(path);
}
-
-
- #region mock
- private ConcurrentDictionary _ensuredEvents = new ConcurrentDictionary();
-
- private async Task mockEnsureEventsSendAsync(
- HttpRequestMessage message,
- CancellationToken token)
- {
- var uri = message.RequestUri.ToString();
- var responseBody = "";
-
- if (_failInsert)
- {
- dynamic failResponse = new ExpandoObject();
- failResponse.error = new ExpandoObject();
- failResponse.error.code = 500;
- failResponse.error.message = "Something went wrong";
-
- responseBody = JsonConvert.SerializeObject(failResponse);
- var fail = new HttpResponseMessage
- {
- StatusCode = HttpStatusCode.InternalServerError,
- Content = new StringContent(responseBody)
- };
- fail.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
- fail.Headers.Add("x-request-id", "1");
- return fail;
- }
-
- if (uri.Contains("/token/inspect"))
- {
- dynamic inspectResponse = new ExpandoObject();
- inspectResponse.projects = new List();
- dynamic project = new ExpandoObject();
- project.projectUrlName = _project;
- inspectResponse.projects.Add(project);
-
- responseBody = JsonConvert.SerializeObject(inspectResponse);
- var msg = new HttpResponseMessage
- {
- StatusCode = HttpStatusCode.OK,
- Content = new StringContent(responseBody)
- };
-
- msg.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
- msg.Headers.Add("x-request-id", "1");
- return msg;
- }
-
- var statusCode = HttpStatusCode.OK;
-
- var content = await message.Content.ReadAsStringAsync(token);
- var ids = JsonConvert.DeserializeObject(content);
- IEnumerable items = ids.items;
-
-
-
- if (uri.Contains("/events/byids"))
- {
- Assert.True((bool)ids.ignoreUnknownIds);
-
- dynamic result = new ExpandoObject();
- result.items = new List();
-
- foreach (var item in items)
- {
- string id = item.externalId;
- var ensured = _ensuredEvents.TryGetValue(id, out int countdown) && countdown <= 0;
- if (ensured || id.StartsWith("id"))
- {
- dynamic eventData = new ExpandoObject();
- eventData.externalId = id;
- result.items.Add(eventData);
- _ensuredEvents.TryAdd(id, 0);
- }
- }
- responseBody = JsonConvert.SerializeObject(result);
- }
- else
- {
- dynamic duplicateData = new ExpandoObject();
- duplicateData.error = new ExpandoObject();
- duplicateData.error.code = 409;
- duplicateData.error.message = "ExternalIds duplicated";
- duplicateData.error.duplicated = new List();
-
- dynamic result = new ExpandoObject();
- result.items = new List();
-
- foreach (var item in items)
- {
- string id = item.externalId;
- var hasValue = _ensuredEvents.TryGetValue(id, out int countdown);
- if ((!hasValue || countdown > 0) && id.StartsWith("duplicated"))
- {
- var splittedId = id.Split('-');
- var count = splittedId.Length == 2 ? int.Parse(splittedId[1]) - 1 : 0;
- dynamic duplicatedId = new ExpandoObject();
- duplicatedId.externalId = id;
- duplicateData.error.duplicated.Add(duplicatedId);
- _ensuredEvents[id] = hasValue ? countdown - 1 : count;
- }
- else
- {
- dynamic eventData = new ExpandoObject();
- eventData.externalId = id;
- result.items.Add(eventData);
- _ensuredEvents.TryAdd(id, 0);
- }
-
- }
- if (duplicateData.error.duplicated.Count > 0)
- {
- responseBody = JsonConvert.SerializeObject(duplicateData);
- statusCode = HttpStatusCode.Conflict;
- }
- else
- {
- responseBody = JsonConvert.SerializeObject(result);
- }
- }
-
- var response = new HttpResponseMessage
- {
- StatusCode = statusCode,
- Content = new StringContent(responseBody)
- };
- response.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
- response.Headers.Add("x-request-id", "1");
-
- return response;
- }
- #endregion
}
}