This repository has been archived by the owner on Jun 25, 2019. It is now read-only.
forked from mhart/kinesalite
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgetShardIterator.js
305 lines (255 loc) · 15.6 KB
/
getShardIterator.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
var async = require('async'),
BigNumber = require('bignumber.js'),
helpers = require('./helpers')
var target = 'GetShardIterator',
request = helpers.request,
randomName = helpers.randomName,
opts = helpers.opts.bind(null, target),
assertType = helpers.assertType.bind(null, target),
assertValidation = helpers.assertValidation.bind(null, target),
assertNotFound = helpers.assertNotFound.bind(null, target),
assertInvalidArgument = helpers.assertInvalidArgument.bind(null, target)
assertInternalFailure = helpers.assertInternalFailure.bind(null, target)
describe('getShardIterator', function() {
describe('serializations', function() {
it('should return SerializationException when ShardId is not an String', function(done) {
assertType('ShardId', 'String', done)
})
it('should return SerializationException when ShardIteratorType is not a String', function(done) {
assertType('ShardIteratorType', 'String', done)
})
it('should return SerializationException when StartingSequenceNumber is not a String', function(done) {
assertType('StartingSequenceNumber', 'String', done)
})
it('should return SerializationException when StreamName is not a String', function(done) {
assertType('StreamName', 'String', done)
})
it('should return SerializationException when Timestamp is not a Timestamp', function(done) {
assertType('Timestamp', 'Timestamp', done)
})
})
describe('validations', function() {
it('should return ValidationException for no StreamName', function(done) {
assertValidation({}, [
'Value null at \'shardId\' failed to satisfy constraint: ' +
'Member must not be null',
'Value null at \'shardIteratorType\' failed to satisfy constraint: ' +
'Member must not be null',
'Value null at \'streamName\' failed to satisfy constraint: ' +
'Member must not be null',
], done)
})
it('should return ValidationException for empty StreamName', function(done) {
assertValidation({StreamName: '', ShardIteratorType: '', ShardId: '', StartingSequenceNumber: ''}, [
'Value \'\' at \'shardId\' failed to satisfy constraint: ' +
'Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+',
'Value \'\' at \'shardId\' failed to satisfy constraint: ' +
'Member must have length greater than or equal to 1',
'Value \'\' at \'shardIteratorType\' failed to satisfy constraint: ' +
'Member must satisfy enum value set: [AFTER_SEQUENCE_NUMBER, LATEST, AT_TIMESTAMP, AT_SEQUENCE_NUMBER, TRIM_HORIZON]',
'Value \'\' at \'startingSequenceNumber\' failed to satisfy constraint: ' +
'Member must satisfy regular expression pattern: 0|([1-9]\\d{0,128})',
'Value \'\' at \'streamName\' failed to satisfy constraint: ' +
'Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+',
'Value \'\' at \'streamName\' failed to satisfy constraint: ' +
'Member must have length greater than or equal to 1',
], done)
})
it('should return ValidationException for long StreamName', function(done) {
var name = new Array(129 + 1).join('a')
assertValidation({StreamName: name, ShardId: name, ShardIteratorType: 'LATEST'}, [
'Value \'' + name + '\' at \'shardId\' failed to satisfy constraint: ' +
'Member must have length less than or equal to 128',
'Value \'' + name + '\' at \'streamName\' failed to satisfy constraint: ' +
'Member must have length less than or equal to 128',
], done)
})
it('should return ValidationException for long StreamName', function(done) {
var name = new Array(129 + 1).join('a')
assertValidation({StreamName: name, ShardId: name, ShardIteratorType: 'LATEST'}, [
'Value \'' + name + '\' at \'shardId\' failed to satisfy constraint: ' +
'Member must have length less than or equal to 128',
'Value \'' + name + '\' at \'streamName\' failed to satisfy constraint: ' +
'Member must have length less than or equal to 128',
], done)
})
it('should return ResourceNotFoundException if unknown stream and shard ID just small enough', function(done) {
var name1 = randomName(), name2 = '2147483647'
assertNotFound({StreamName: name1, ShardId: name2, ShardIteratorType: 'LATEST'},
'Shard shardId-002147483647 in stream ' + name1 + ' under account ' +
helpers.awsAccountId + ' does not exist', done)
})
it('should return ResourceNotFoundException if unknown stream and random shard name', function(done) {
var name1 = randomName(), name2 = randomName() + '-2147483647'
assertNotFound({StreamName: name1, ShardId: name2, ShardIteratorType: 'LATEST'},
'Shard shardId-002147483647 in stream ' + name1 + ' under account ' +
helpers.awsAccountId + ' does not exist', done)
})
it('should return ResourceNotFoundException if unknown stream and short prefix', function(done) {
var name1 = randomName(), name2 = 'a-00002147483647'
assertNotFound({StreamName: name1, ShardId: name2, ShardIteratorType: 'LATEST'},
'Shard shardId-002147483647 in stream ' + name1 + ' under account ' +
helpers.awsAccountId + ' does not exist', done)
})
it('should return ResourceNotFoundException if unknown stream and no prefix', function(done) {
var name1 = randomName(), name2 = '-00002147483647'
assertNotFound({StreamName: name1, ShardId: name2, ShardIteratorType: 'LATEST'},
'Shard shardId-002147483647 in stream ' + name1 + ' under account ' +
helpers.awsAccountId + ' does not exist', done)
})
it('should return ResourceNotFoundException if unknown stream and shard ID too big', function(done) {
var name1 = randomName(), name2 = '2147483648'
assertNotFound({StreamName: name1, ShardId: name2, ShardIteratorType: 'LATEST'},
'Could not find shard ' + name2 + ' in stream ' + name1 + ' under account ' + helpers.awsAccountId + '.', done)
})
it('should return ResourceNotFoundException if unknown stream and raw shard ID too big', function(done) {
var name1 = randomName(), name2 = 'shardId-002147483648'
assertNotFound({StreamName: name1, ShardId: name2, ShardIteratorType: 'LATEST'},
'Could not find shard ' + name2 + ' in stream ' + name1 + ' under account ' + helpers.awsAccountId + '.', done)
})
it('should return ResourceNotFoundException if unknown stream and string shard ID', function(done) {
var name1 = randomName(), name2 = 'ABKLFD8'
assertNotFound({StreamName: name1, ShardId: name2, ShardIteratorType: 'LATEST'},
'Could not find shard ' + name2 + ' in stream ' + name1 + ' under account ' + helpers.awsAccountId + '.', done)
})
it('should return ResourceNotFoundException if unknown stream and exponent shard ID', function(done) {
var name1 = randomName(), name2 = '2.14E4'
assertNotFound({StreamName: name1, ShardId: name2, ShardIteratorType: 'LATEST'},
'Could not find shard ' + name2 + ' in stream ' + name1 + ' under account ' + helpers.awsAccountId + '.', done)
})
it('should return ResourceNotFoundException if known stream and raw shard ID does not exist', function(done) {
var name1 = helpers.testStream, name2 = 'shardId-5'
assertNotFound({StreamName: name1, ShardId: name2, ShardIteratorType: 'AT_SEQUENCE_NUMBER', StartingSequenceNumber: '5'},
'Shard shardId-000000000005 in stream ' + name1 + ' under account ' +
helpers.awsAccountId + ' does not exist', done)
})
it('should return InvalidArgumentException if AT_SEQUENCE_NUMBER and no StartingSequenceNumber', function(done) {
assertInvalidArgument({StreamName: helpers.testStream, ShardId: 'shardId-0', ShardIteratorType: 'AT_SEQUENCE_NUMBER'},
'Must either specify (1) AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER and StartingSequenceNumber or ' +
'(2) TRIM_HORIZON or LATEST and no StartingSequenceNumber. ' +
'Request specified AT_SEQUENCE_NUMBER and no StartingSequenceNumber.', done)
})
it('should return InvalidArgumentException if AFTER_SEQUENCE_NUMBER and no StartingSequenceNumber', function(done) {
assertInvalidArgument({StreamName: helpers.testStream, ShardId: 'shardId-0', ShardIteratorType: 'AFTER_SEQUENCE_NUMBER'},
'Must either specify (1) AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER and StartingSequenceNumber or ' +
'(2) TRIM_HORIZON or LATEST and no StartingSequenceNumber. ' +
'Request specified AFTER_SEQUENCE_NUMBER and no StartingSequenceNumber.', done)
})
it('should return InvalidArgumentException if LATEST and StartingSequenceNumber', function(done) {
assertInvalidArgument({StreamName: helpers.testStream, ShardId: 'shardId-0', ShardIteratorType: 'LATEST', StartingSequenceNumber: '5'},
'Must either specify (1) AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER and StartingSequenceNumber or ' +
'(2) TRIM_HORIZON or LATEST and no StartingSequenceNumber. ' +
'Request specified LATEST and also a StartingSequenceNumber.', done)
})
it('should return InvalidArgumentException if TRIM_HORIZON and StartingSequenceNumber', function(done) {
assertInvalidArgument({StreamName: helpers.testStream, ShardId: 'shardId-0', ShardIteratorType: 'TRIM_HORIZON', StartingSequenceNumber: '5'},
'Must either specify (1) AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER and StartingSequenceNumber or ' +
'(2) TRIM_HORIZON or LATEST and no StartingSequenceNumber. ' +
'Request specified TRIM_HORIZON and also a StartingSequenceNumber.', done)
})
it('should return InvalidArgumentException if shard mismatches simple sequence', function(done) {
var name1 = helpers.testStream, name2 = 'shardId-0'
assertInvalidArgument({StreamName: name1, ShardId: name2, ShardIteratorType: 'AT_SEQUENCE_NUMBER', StartingSequenceNumber: '5'},
'Invalid StartingSequenceNumber. It encodes shardId-000000000005, ' +
'while it was used in a call to a shard with shardId-000000000000', done)
})
it('should return InternalFailure if using small (old?) sequence number', function(done) {
var name1 = helpers.testStream, name2 = 'shardId-0'
assertInternalFailure({StreamName: name1, ShardId: name2, ShardIteratorType: 'AT_SEQUENCE_NUMBER', StartingSequenceNumber: '0'}, done)
})
it('should return InvalidArgumentException if using sequence number with large date', function(done) {
var name1 = helpers.testStream, name2 = 'shardId-0', seq = new BigNumber('13bb2cc3d80000000000000000000000', 16).toFixed()
assertInvalidArgument({StreamName: name1, ShardId: name2, ShardIteratorType: 'AT_SEQUENCE_NUMBER', StartingSequenceNumber: seq},
'StartingSequenceNumber 26227199374822427428162556223570313216 used in GetShardIterator on shard ' +
'shardId-000000000000 in stream ' + name1 + ' under account ' + helpers.awsAccountId +
' is invalid.', done)
})
it('should return InvalidArgumentException for 8 in index in StartingSequenceNumber', function(done) {
var seq = new BigNumber('20000000000800000000000000000000000000000000002', 16).toFixed()
assertInvalidArgument({StreamName: helpers.testStream, ShardId: 'shardId-0', ShardIteratorType: 'AT_SEQUENCE_NUMBER', StartingSequenceNumber: seq},
'StartingSequenceNumber ' + seq + ' used in GetShardIterator on shard ' +
'shardId-000000000000 in stream ' + helpers.testStream + ' under account ' + helpers.awsAccountId +
' is invalid.', done)
})
it('should return InvalidArgumentException for 8 near end of StartingSequenceNumber', function(done) {
var seq = new BigNumber('20000000000000000000000000000000000000800000002', 16).toFixed()
assertInvalidArgument({StreamName: helpers.testStream, ShardId: 'shardId-0', ShardIteratorType: 'AT_SEQUENCE_NUMBER', StartingSequenceNumber: seq},
'Invalid StartingSequenceNumber. It encodes shardId--02147483648, ' +
'while it was used in a call to a shard with shardId-000000000000', done)
})
it('should return InvalidArgumentException if AT_TIMESTAMP and no Timestamp', function(done) {
assertInvalidArgument({StreamName: helpers.testStream, ShardId: 'shardId-0', ShardIteratorType: 'AT_TIMESTAMP'},
'Must specify timestampInMillis parameter for iterator of type AT_TIMESTAMP. Current request has no timestamp parameter.', done)
})
it('should return InvalidArgumentException if Timestamp in the future', function(done) {
var theFuture = (Date.now() / 1000) + 2
assertInvalidArgument({StreamName: helpers.testStream, ShardId: 'shardId-0', ShardIteratorType: 'AT_TIMESTAMP', Timestamp: theFuture},
new RegExp('^The timestampInMillis parameter cannot be greater than the currentTimestampInMillis. ' +
'timestampInMillis: ' + theFuture * 1000 + ', currentTimestampInMillis: \\d+$'), done)
})
})
describe('functionality', function() {
it('should work with random shard ID with hyphen', function(done) {
function testType(shardIteratorType, cb) {
request(opts({
StreamName: helpers.testStream,
ShardId: randomName() + '-0',
ShardIteratorType: shardIteratorType,
}), function(err, res) {
if (err) return cb(err)
res.statusCode.should.equal(200)
Object.keys(res.body).should.eql(['ShardIterator'])
helpers.assertShardIterator(res.body.ShardIterator, helpers.testStream)
cb()
})
}
async.forEach(['LATEST', 'TRIM_HORIZON'], testType, done)
})
it('should work with different length stream names', function(done) {
this.timeout(100000)
var stream1 = (randomName() + new Array(14).join('0')).slice(0, 29),
stream2 = (randomName() + new Array(14).join('0')).slice(0, 30)
request(helpers.opts('CreateStream', {StreamName: stream1, ShardCount: 1}), function(err, res) {
if (err) return done(err)
res.statusCode.should.equal(200)
request(helpers.opts('CreateStream', {StreamName: stream2, ShardCount: 1}), function(err, res) {
if (err) return done(err)
res.statusCode.should.equal(200)
helpers.waitUntilActive(stream1, function(err, res) {
if (err) return done(err)
res.statusCode.should.equal(200)
request(opts({
StreamName: stream1,
ShardId: 'shard-0',
ShardIteratorType: 'LATEST',
}), function(err, res) {
if (err) return done(err)
res.statusCode.should.equal(200)
Object.keys(res.body).should.eql(['ShardIterator'])
helpers.assertShardIterator(res.body.ShardIterator, stream1)
helpers.waitUntilActive(stream2, function(err, res) {
if (err) return done(err)
res.statusCode.should.equal(200)
request(opts({
StreamName: stream2,
ShardId: 'shard-0',
ShardIteratorType: 'LATEST',
}), function(err, res) {
if (err) return done(err)
res.statusCode.should.equal(200)
Object.keys(res.body).should.eql(['ShardIterator'])
helpers.assertShardIterator(res.body.ShardIterator, stream2)
request(helpers.opts('DeleteStream', {StreamName: stream1}), function(err, res) {
if (err) return done(err)
res.statusCode.should.equal(200)
request(helpers.opts('DeleteStream', {StreamName: stream2}), done)
})
})
})
})
})
})
})
})
})
})