-
Notifications
You must be signed in to change notification settings - Fork 181
/
Copy pathfetchDataStream.js
98 lines (92 loc) · 2.9 KB
/
fetchDataStream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* Process as many rows as possible from a data stream.
*
* @param {array} data - a chunk of data read from the data stream
* @param {function} rowcb - The row callback function
* @param {function} endstreamcb - The stream termination callback
* function
* @return {object} An object containing keys "remainder" which is
* a slice of any unprocessed data, and a key "eos" which is a boolean
* indicating whether the end of the stream has been reached.
*/
async function processLines(data, rowcb, endstreamcb) {
const utf8Decoder = new TextDecoder('utf-8');
let row = [];
let colStart = -1;
let rowStart = 0;
for (let i = 0; i < data.length; i++) {
switch (data[i]) {
case 0x1e: // end of column
const rowdata = data.slice(colStart+1, i);
const encoded = utf8Decoder.decode(rowdata);
colStart = i;
row.push(encoded);
continue;
case 0x1f: // end of row
const rowdata2 = data.slice(colStart+1, i);
const encoded2 = utf8Decoder.decode(rowdata2);
row.push(encoded2);
rowcb(row);
rowStart = i+1;
colStart = i;
row = [];
continue;
case 0x04: // end of stream
endstreamcb(row);
return {remainder: [], eos: true};
}
}
return {remainder: data.slice(rowStart), eos: false};
}
/**
* fetchDataStream fetches a data stream from dataURL where
* rows are denoted by the byte 0x1f, columns by 0x1e, and
* the stream is terminated by 0x04.
*
* @param {string} dataURL - the URL of the stream to fetch
* @param {function} rowcb - A callback to call for each row
* @param {function} chunkcb - A callback to call for each chunk
* read from the stream.
* @param {function} endstreamcb - A callback to call when the final
* byte is read from the stream.
* @param {string} method - the HTTP method to use for the request
*/
async function fetchDataStream(dataURL, rowcb, chunkcb, endstreamcb, method) {
const response = await fetch(
dataURL,
{
method: method || 'get',
credentials: 'same-origin',
},
);
const reader = response.body.getReader();
let remainder = [];
let doneLoop = false;
while (!doneLoop) {
await reader.read().then(({done, value}) => {
let combined;
if (remainder.length == 0) {
combined = value;
} else {
combined = new Uint8Array(
value.length + remainder.length
);
for (let i = 0; i < remainder.length; i++) {
combined[i] = remainder[i];
}
for (let i = 0; i < value.length; i++) {
combined[i+remainder.length] = value[i];
}
}
return processLines(combined, rowcb, endstreamcb);
}).then(({remainder: rem, eos}) => {
chunkcb(eos);
doneLoop = eos;
remainder = rem;
}).catch((err) => {
console.error(err);
doneLoop = true;
});
}
}
export default fetchDataStream;