forked from files-community/Files
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathClientContext.cs
More file actions
150 lines (125 loc) · 3.82 KB
/
ClientContext.cs
File metadata and controls
150 lines (125 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
namespace Files.App.Communication
{
// Per-client state with token-bucket, lossy enqueue and LastSeenUtc tracked.
public sealed class ClientContext : IDisposable
{
// readonly fields
private readonly object _rateLock = new();
private readonly ConcurrentQueue<(string payload, bool isNotification, string? method)> _sendQueue = new();
// Fields
private long _queuedBytes = 0;
private int _tokens;
private DateTime _lastRefill;
// _disposed field
private bool _disposed;
// Properties
public Guid Id { get; } = Guid.NewGuid();
public string? ClientInfo { get; set; }
public bool IsAuthenticated { get; set; }
public int AuthEpoch { get; set; } = 0; // set at handshake
public DateTime LastSeenUtc { get; set; } = DateTime.UtcNow;
public long MaxQueuedBytes { get; set; } = IpcConfig.PerClientQueueCapBytes;
public CancellationTokenSource? Cancellation { get; set; }
public WebSocket? WebSocket { get; set; }
public object? TransportHandle { get; set; } // can store session id, pipe name, etc.
internal ConcurrentQueue<(string payload, bool isNotification, string? method)> SendQueue => _sendQueue;
// Constructor
public ClientContext()
{
_tokens = IpcConfig.RateLimitBurst;
_lastRefill = DateTime.UtcNow;
}
// Public methods
public void RefillTokens()
{
lock (_rateLock)
{
var now = DateTime.UtcNow;
var delta = (now - _lastRefill).TotalSeconds;
if (delta <= 0)
return;
var add = (int)(delta * IpcConfig.RateLimitPerSecond);
if (add > 0)
{
_tokens = Math.Min(IpcConfig.RateLimitBurst, _tokens + add);
_lastRefill = now;
}
}
}
public bool TryConsumeToken()
{
RefillTokens();
lock (_rateLock)
{
if (_tokens <= 0)
return false;
_tokens--;
return true;
}
}
// Try enqueue with lossy policy; drops oldest notifications of the same method first when needed.
public bool TryEnqueue(string payload, bool isNotification, string? method = null)
{
var bytes = Encoding.UTF8.GetByteCount(payload);
var newVal = Interlocked.Add(ref _queuedBytes, bytes);
if (newVal > MaxQueuedBytes)
{
// attempt to free by dropping oldest notifications (prefer same-method)
int freed = 0;
var initialQueue = new List<(string payload, bool isNotification, string? method)>();
while (SendQueue.TryDequeue(out var old))
{
if (!old.isNotification)
{
initialQueue.Add(old); // keep responses
}
else if (old.method != null && method != null && old.method.Equals(method, StringComparison.OrdinalIgnoreCase) && freed == 0)
{
// drop one older of same method
var b = Encoding.UTF8.GetByteCount(old.payload);
Interlocked.Add(ref _queuedBytes, -b);
freed += b;
break;
}
else
{
// for fairness, try dropping other notifications as well
var b = Encoding.UTF8.GetByteCount(old.payload);
Interlocked.Add(ref _queuedBytes, -b);
freed += b;
if (Interlocked.Read(ref _queuedBytes) <= MaxQueuedBytes)
break;
}
}
// push back preserved responses
foreach (var item in initialQueue)
SendQueue.Enqueue(item);
newVal = Interlocked.Read(ref _queuedBytes);
if (newVal + bytes > MaxQueuedBytes)
{
// still cannot enqueue
return false;
}
}
SendQueue.Enqueue((payload, isNotification, method));
return true;
}
// Internal methods
internal void DecreaseQueuedBytes(int sentBytes) => Interlocked.Add(ref _queuedBytes, -sentBytes);
// Dispose
public void Dispose()
{
if (_disposed)
return;
try { Cancellation?.Cancel(); } catch { }
try { WebSocket?.Dispose(); } catch { }
_disposed = true;
}
}
}