forked from datastax/nodejs-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream-id-stack.js
200 lines (183 loc) · 5.23 KB
/
stream-id-stack.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const types = require('./types');
/**
* Group size
* @type {number}
*/
const groupSize = 128;
/**
* Number used to right shift ids to allocate them into groups
* @const
* @type {number}
*/
const shiftToGroup = 7;
/**
* Amount of groups that can be released per time
* If it grows larger than 4 groups (128 * 4), groups can be released
* @const
* @type {number}
*/
const releasableSize = 4;
/**
* 32K possible stream ids depending for protocol v3 and above
* @const
* @type {number}
*/
const maxGroupsFor2Bytes = 256;
/**
* Delay used to check if groups can be released
* @const
* @type {number}
*/
const defaultReleaseDelay = 5000;
/**
* Represents a queue of ids from 0 to maximum stream id supported by the protocol version.
* Clients can dequeue a stream id using {@link StreamIdStack#shift()} and enqueue (release) using
* {@link StreamIdStack#push()}
*/
class StreamIdStack {
/**
* Creates a new instance of StreamIdStack.
* @param {number} version Protocol version
* @constructor
*/
constructor(version) {
//Ecmascript Number is 64-bit double, it can be optimized by the engine into a 32-bit int, but nothing below that.
//We try to allocate as few as possible in arrays of 128
this.currentGroup = generateGroup(0);
this.groupIndex = 0;
this.groups = [this.currentGroup];
this.releaseTimeout = null;
this.setVersion(version);
/**
* Returns the amount of ids currently in use
* @member {number}
*/
this.inUse = 0;
this.releaseDelay = defaultReleaseDelay;
}
/**
* Sets the protocol version
* @param {Number} version
*/
setVersion(version) {
//128 or 32K stream ids depending on the protocol version
this.maxGroups = types.protocolVersion.uses2BytesStreamIds(version) ? maxGroupsFor2Bytes : 1;
}
/**
* Dequeues an id.
* Similar to {@link Array#pop()}.
* @returns {Number} Returns an id or null
*/
pop() {
let id = this.currentGroup.pop();
if (typeof id !== 'undefined') {
this.inUse++;
return id;
}
//try to use the following groups
while (this.groupIndex < this.groups.length - 1) {
//move to the following group
this.currentGroup = this.groups[++this.groupIndex];
//try dequeue
id = this.currentGroup.pop();
if (typeof id !== 'undefined') {
this.inUse++;
return id;
}
}
return this._tryCreateGroup();
}
/**
* Enqueue an id for future use.
* Similar to {@link Array#push()}.
* @param {Number} id
*/
push(id) {
this.inUse--;
const groupIndex = id >> shiftToGroup;
const group = this.groups[groupIndex];
group.push(id);
if (groupIndex < this.groupIndex) {
//Set the lower group to be used to dequeue from
this.groupIndex = groupIndex;
this.currentGroup = group;
}
this._tryIssueRelease();
}
/**
* Clears all timers
*/
clear() {
if (this.releaseTimeout) {
clearTimeout(this.releaseTimeout);
this.releaseTimeout = null;
}
}
/**
* Tries to create an additional group and returns a new id
* @returns {Number} Returns a new id or null if it's not possible to create a new group
* @private
*/
_tryCreateGroup() {
if (this.groups.length === this.maxGroups) {
//we can have an additional group
return null;
}
//Add a new group at the last position
this.groupIndex = this.groups.length;
//Using 128 * groupIndex as initial value
this.currentGroup = generateGroup(this.groupIndex << shiftToGroup);
this.groups.push(this.currentGroup);
this.inUse++;
return this.currentGroup.pop();
}
_tryIssueRelease() {
if (this.releaseTimeout || this.groups.length <= releasableSize) {
//Nothing to release or a release delay has been issued
return;
}
this.releaseTimeout = setTimeout(() => this._releaseGroups(), this.releaseDelay);
}
_releaseGroups() {
let counter = 0;
let index = this.groups.length - 1;
//only release up to n groups (n = releasable size)
//shrink back up to n groups not all the way up to 1
while (counter++ < releasableSize && this.groups.length > releasableSize && index > this.groupIndex) {
if (this.groups[index].length !== groupSize) {
//the group is being used
break;
}
this.groups.pop();
index--;
}
this.releaseTimeout = null;
//Issue next release if applies
this._tryIssueRelease();
}
}
function generateGroup(initialValue) {
const arr = new Array(groupSize);
const upperBound = initialValue + groupSize - 1;
for (let i = 0; i < groupSize; i++) {
arr[i] = upperBound - i;
}
return arr;
}
module.exports = StreamIdStack;