Skip to content

Commit 8a21e7f

Browse files
add some tracing
1 parent 828f8cc commit 8a21e7f

File tree

8 files changed

+113
-12
lines changed

8 files changed

+113
-12
lines changed

src/KurrentDB.Core.Tests/Services/Replication/ReplicationTestHelper.cs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,40 @@ public static ClientMessage.WriteEventsCompleted WriteEvent<TLogFormat, TStreamI
2222
string streamId) {
2323
var resetEvent = new ManualResetEventSlim();
2424
ClientMessage.WriteEventsCompleted writeResult = null;
25-
node.Node.MainQueue.Publish(new ClientMessage.WriteEvents(Guid.NewGuid(), Guid.NewGuid(),
25+
ClientMessage.WriteEvents writeRequest = default;
26+
writeRequest = new ClientMessage.WriteEvents(
27+
Guid.NewGuid(),
28+
Guid.NewGuid(),
2629
new CallbackEnvelope(msg => {
30+
if (writeRequest.Trace)
31+
writeRequest.AddTrace($"Envelope received reply: {msg}");
2732
writeResult = (ClientMessage.WriteEventsCompleted)msg;
2833
resetEvent.Set();
29-
}), false, streamId, -1, events,
30-
SystemAccounts.System, new Dictionary<string, string> {
34+
}),
35+
requireLeader: false, //qq should be leader but this might help make sure?
36+
streamId,
37+
-1,
38+
events,
39+
SystemAccounts.System,
40+
new Dictionary<string, string> {
3141
["uid"] = SystemUsers.Admin,
3242
["pwd"] = SystemUsers.DefaultAdminPassword
33-
}));
43+
}) {
44+
Trace = true
45+
};
46+
47+
node.Node.MainQueue.Publish(writeRequest);
48+
3449
if (!resetEvent.Wait(_timeout)) {
35-
Assert.Fail("Timed out waiting for event to be written");
50+
var joined = string.Join(Environment.NewLine, writeRequest.GetTraceMessages());
51+
Assert.Fail("Timed out waiting for event to be written: " + Environment.NewLine + joined);
3652
return null;
3753
}
3854

55+
//qq temp
56+
// var joinedTemp = string.Join(Environment.NewLine, writeRequest.GetTraceMessages());
57+
// Assert.Fail("Success: " + Environment.NewLine + joinedTemp);
58+
3959
return writeResult;
4060
}
4161

src/KurrentDB.Core/Bus/InMemoryBus.MessageHierarchy.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ public override async ValueTask InvokeAsync(Message message, CancellationToken t
127127

128128
// first handler is the parent
129129
foreach (var handler in _handlers) {
130+
if (message.Trace)
131+
message.AddTrace($"InMemoryBus calling handler {handler}");
130132
await handler.Invoke(Unsafe.As<T>(message), token);
131133
}
132134
}

src/KurrentDB.Core/Bus/QueuedHandlerThreadPool.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,13 @@ async void IThreadPoolWorkItem.Execute() {
135135
while (!_lifetimeToken.IsCancellationRequested && _queue.TryDequeue(out var item)) {
136136
var start = _tracker.RecordMessageDequeued(item.EnqueuedAt);
137137
var msg = item.Message;
138+
//qq maybe build this into the tracker?
139+
if (msg.Trace)
140+
msg.AddTrace($"Dequeued from {Name}");
138141
#if DEBUG
139142
_queueStats.Dequeued(msg);
140143
#endif
144+
141145
try {
142146
var queueCnt = _queue.Count;
143147
_queueStats.ProcessingStarted(msg.GetType(), queueCnt);
@@ -206,6 +210,10 @@ public void Publish(Message message) {
206210
_queueStats.Enqueued();
207211
#endif
208212
_queue.Enqueue(new(_tracker.Now, message));
213+
//qq maybe build this into the tracker?
214+
if (message.Trace)
215+
message.AddTrace($"Enqueued into {Name}");
216+
209217
if (!_lifetimeToken.IsCancellationRequested && Interlocked.CompareExchange(ref _isRunning, 1, 0) == 0) {
210218
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
211219
}

src/KurrentDB.Core/Messaging/Message.cs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
33

44
using System;
5+
using System.Collections.Generic;
56
using System.Threading;
67
using Newtonsoft.Json;
78

@@ -26,4 +27,40 @@ public DerivedMessageAttribute(object messageGroup) {
2627
public abstract partial class Message(CancellationToken token = default) {
2728
[JsonIgnore]
2829
public CancellationToken CancellationToken => token;
30+
31+
[JsonIgnore]
32+
public bool Trace { get; init; }
33+
34+
public string[] GetTraceMessages() {
35+
var list = new List<string>();
36+
foreach (var message in GetTrace()) {
37+
if (message is string m) {
38+
list.Add(m);
39+
} else if (message is Message msg) {
40+
var msgName = msg.GetType().Name;
41+
foreach (var y in msg.GetTraceMessages()) {
42+
list.Add($" {msgName}: {y}");
43+
}
44+
}
45+
}
46+
return list.ToArray();
47+
}
48+
49+
public object[] GetTrace() {
50+
lock (this) {
51+
return _traceMessages.ToArray();
52+
}
53+
}
54+
55+
private List<object> _traceMessages;
56+
57+
public void AddTrace(object message) {
58+
if (!Trace)
59+
return;
60+
61+
lock (this) {
62+
_traceMessages ??= [];
63+
_traceMessages.Add(message);
64+
}
65+
}
2966
}

src/KurrentDB.Core/Services/RequestManager/Managers/RequestManagerBase.cs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public abstract class RequestManagerBase :
4242
protected long FailureCurrentVersion = -1;
4343
protected long TransactionId;
4444

45+
private readonly Message _request;
4546
protected readonly CommitSource CommitSource;
4647
protected long LastEventPosition;
4748
protected bool Registered;
@@ -70,7 +71,8 @@ protected RequestManagerBase(
7071
CommitSource commitSource,
7172
int prepareCount = 0,
7273
long transactionId = -1,
73-
bool waitForCommit = false) {
74+
bool waitForCommit = false,
75+
Message request = default) {
7476
Ensure.NotEmptyGuid(internalCorrId, nameof(internalCorrId));
7577
Ensure.NotEmptyGuid(clientCorrId, nameof(clientCorrId));
7678
Ensure.NotNull(publisher, nameof(publisher));
@@ -87,6 +89,7 @@ protected RequestManagerBase(
8789
CommitSource = commitSource;
8890
_prepareCount = prepareCount;
8991
TransactionId = transactionId;
92+
_request = request;
9093
_commitReceived = !waitForCommit; //if not waiting for commit flag as true
9194
_allPreparesWritten = _prepareCount == 0; //if not waiting for prepares flag as true
9295
if (prepareCount == 0 && waitForCommit == false) {
@@ -102,11 +105,18 @@ protected RequestManagerBase(
102105
protected abstract Message ClientFailMsg { get; }
103106
public void Start() {
104107
NextTimeoutTime = DateTime.UtcNow + Timeout;
105-
Publisher.Publish(WriteRequestMsg);
108+
var x = WriteRequestMsg;
109+
if (_request.Trace)
110+
_request.AddTrace(x);
111+
Publisher.Publish(x);
106112
}
107113

108114
public void Handle(StorageMessage.PrepareAck message) {
109115
if (Interlocked.Read(ref _complete) == 1 || _allPreparesWritten) { return; }
116+
117+
if (_request.Trace)
118+
_request.AddTrace($"RequestManager Received {message.GetType().Name}");
119+
110120
NextTimeoutTime = DateTime.UtcNow + Timeout;
111121
if (message.Flags.HasAnyOf(PrepareFlags.TransactionBegin)) {
112122
TransactionId = message.LogPosition;
@@ -125,6 +135,10 @@ public void Handle(StorageMessage.PrepareAck message) {
125135
}
126136
public virtual void Handle(StorageMessage.CommitIndexed message) {
127137
if (Interlocked.Read(ref _complete) == 1 || _commitReceived) { return; }
138+
139+
if (_request.Trace)
140+
_request.AddTrace($"RequestManager Received {message.GetType().Name}");
141+
128142
NextTimeoutTime = DateTime.UtcNow + Timeout;
129143
_commitReceived = true;
130144
_allEventsWritten = _commitReceived && _allPreparesWritten;
@@ -153,6 +167,8 @@ protected virtual void Committed() {
153167
Publisher.Publish(new StorageMessage.RequestCompleted(InternalCorrId, true));
154168
}
155169
public void Handle(StorageMessage.RequestManagerTimerTick message) {
170+
if (_request.Trace)
171+
_request.AddTrace($"RequestManager Received {message.GetType().Name}");
156172
if (_allEventsWritten) { AllEventsWritten(); }
157173
if (Interlocked.Read(ref _complete) == 1 || message.UtcNow < NextTimeoutTime)
158174
return;
@@ -161,16 +177,25 @@ public void Handle(StorageMessage.RequestManagerTimerTick message) {
161177
CompleteFailedRequest(result, msg);
162178
}
163179
public void Handle(StorageMessage.InvalidTransaction message) {
180+
if (_request.Trace)
181+
_request.AddTrace($"RequestManager Received {message.GetType().Name}");
164182
CompleteFailedRequest(OperationResult.InvalidTransaction, "Invalid transaction.");
165183
}
166184
public void Handle(StorageMessage.WrongExpectedVersion message) {
185+
if (_request.Trace)
186+
_request.AddTrace($"RequestManager Received {message.GetType().Name}");
167187
FailureCurrentVersion = message.CurrentVersion;
168188
CompleteFailedRequest(OperationResult.WrongExpectedVersion, "Wrong expected version.", message.CurrentVersion);
169189
}
170190
public void Handle(StorageMessage.StreamDeleted message) {
191+
if (_request.Trace)
192+
_request.AddTrace($"RequestManager Received {message.GetType().Name}");
171193
CompleteFailedRequest(OperationResult.StreamDeleted, "Stream is deleted.");
172194
}
173195
public void Handle(StorageMessage.AlreadyCommitted message) {
196+
if (_request.Trace)
197+
_request.AddTrace($"RequestManager Received {message.GetType().Name}");
198+
174199
if (Interlocked.Read(ref _complete) == 1 || _allEventsWritten) { return; }
175200
Log.Debug("IDEMPOTENT WRITE TO STREAM ClientCorrelationID {clientCorrelationId}, {message}.", ClientCorrId,
176201
message);
@@ -193,6 +218,8 @@ private void CompleteFailedRequest(OperationResult result, string error, long cu
193218
if (Interlocked.CompareExchange(ref _complete, 1, 0) == 1) { return; }
194219
Result = result;
195220
FailureMessage = error;
221+
if (_request.Trace)
222+
_request.AddTrace($"RequestManager Replying with RequestCompleted {result}");
196223
Publisher.Publish(new StorageMessage.RequestCompleted(InternalCorrId, false, currentVersion));
197224
_clientResponseEnvelope.ReplyWith(ClientFailMsg);
198225
}

src/KurrentDB.Core/Services/RequestManager/Managers/WriteEvents.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ public class WriteEvents : RequestManagerBase {
1414
private readonly string _streamId;
1515
private readonly Event[] _events;
1616
private readonly CancellationToken _cancellationToken;
17-
public WriteEvents(IPublisher publisher,
17+
public WriteEvents(
18+
IPublisher publisher,
1819
TimeSpan timeout,
1920
IEnvelope clientResponseEnvelope,
2021
Guid internalCorrId,
@@ -23,6 +24,7 @@ public WriteEvents(IPublisher publisher,
2324
long expectedVersion,
2425
Event[] events,
2526
CommitSource commitSource,
27+
ClientMessage.WriteEvents request = default,
2628
CancellationToken cancellationToken = default)
2729
: base(
2830
publisher,
@@ -33,7 +35,8 @@ public WriteEvents(IPublisher publisher,
3335
expectedVersion,
3436
commitSource,
3537
prepareCount: 0,
36-
waitForCommit: true) {
38+
waitForCommit: true,
39+
request: request) {
3740
_streamId = streamId;
3841
_events = events;
3942
_cancellationToken = cancellationToken;
@@ -46,7 +49,7 @@ public WriteEvents(IPublisher publisher,
4649
_streamId,
4750
ExpectedVersion,
4851
_events,
49-
_cancellationToken);
52+
_cancellationToken) { Trace = true };
5053

5154

5255
protected override Message ClientSuccessMsg =>

src/KurrentDB.Core/Services/RequestManager/RequestManagementService.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public void Handle(ClientMessage.WriteEvents message) {
6969
message.ExpectedVersion,
7070
message.Events,
7171
_commitSource,
72+
message,
7273
message.CancellationToken);
7374
_currentRequests.Add(message.InternalCorrId, manager);
7475
_currentTimedRequests.Add(message.InternalCorrId, Stopwatch.StartNew());

src/KurrentDB.Core/Services/VNode/VNodeFSM.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,11 @@ private static void EnsureActionType(Type expectedMessageType, [DisallowNull] Mu
100100
private struct HandlersBuffer {
101101
private Handler _handler;
102102

103-
public readonly ValueTask InvokeAsync(VNodeState index, Message message, CancellationToken token)
104-
=> Unsafe.Add(ref Unsafe.AsRef(in _handler), (int)index).InvokeAsync(message, token);
103+
public readonly ValueTask InvokeAsync(VNodeState index, Message message, CancellationToken token) {
104+
if (message.Trace)
105+
message.AddTrace($"FSM handling in state {index}");
106+
return Unsafe.Add(ref Unsafe.AsRef(in _handler), (int)index).InvokeAsync(message, token);
107+
}
105108
}
106109

107110
[StructLayout(LayoutKind.Auto)]

0 commit comments

Comments
 (0)