Skip to content

Commit 7ce6689

Browse files
authored
Add JSON and CSV output to change-stream-monitor snippet (#11)
* Update changestreammonitor.js Adds capability to generate the monitor's output in CSV (with custom delimiter) and in JSON format. Still pending: update documentation. * Update changestreammonitor.js Remove unnecessary output * Update changestreammonitor.js Changes order of arguments in listChangeStreamsAsCSV * Fixes missing delimiter parameter * Adds documentation * Update README.md * Updates snippet's version
1 parent 6856126 commit 7ce6689

File tree

3 files changed

+179
-13
lines changed

3 files changed

+179
-13
lines changed

snippets/change-streams-monitor/README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66
- [Sample Output - Normal Mode](#sample-output---normal-mode)
77
- [Sample Output - Extended](#sample-output---extended)
88
- [listChangeStreams.help()](#listchangestreamshelp)
9+
- [listChangeStreamsAsTable(extended?: boolean, allUsers?: boolean, nsFilter?: Array)](#listchangestreamsastableextended-boolean-allusers-boolean-nsfilter-array)
10+
- [listChangeStreamsAsTable.help()](#listchangestreamsastablehelp)
11+
- [listChangeStreamsAsJSON(extended?: boolean, allUsers?: boolean, nsFilter?: Array)](#listchangestreamsasjsonextended-boolean-allusers-boolean-nsfilter-array)
12+
- [listChangeStreamsAsJSON.help()](#listchangestreamsasjsonhelp)
13+
- [listChangeStreamsAsCSV(extended?: boolean, delimiter: string, allUsers?: boolean, nsFilter?: Array)](#listchangestreamsascsvextended-boolean-delimiter-string-allusers-boolean-nsfilter-array)
14+
- [listChangeStreamsAsCSV.help()](#listchangestreamsascsvhelp)
915
- [prettyPrintChangeStreamPipeline(connectionId: any)](#prettyprintchangestreampipelineconnectionid-any)
1016
- [Example](#example)
1117
- [prettyPrintChangeStreamPipeline.help()](#prettyprintchangestreampipelinehelp)
@@ -157,6 +163,24 @@ Found 2 change streams
157163
## listChangeStreams.help()
158164
Provides help on how to use the function.
159165

166+
## listChangeStreamsAsTable(extended?: boolean, allUsers?: boolean, nsFilter?: Array<string>)
167+
Alias for `listChangeStreams(extended?: boolean, allUsers?: boolean, nsFilter?: Array<string>)`
168+
169+
## listChangeStreamsAsTable.help()
170+
Provides help on how to use the function. Alias for `listChangeStreams.help()`
171+
172+
## listChangeStreamsAsJSON(extended?: boolean, allUsers?: boolean, nsFilter?: Array<string>)
173+
Prints the currently open Change Streams as a JSON string. A JSON string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour). See documentation for `listChangeStreams(extended?: boolean, allUsers?: boolean, nsFilter?: Array<string>)` for more details about the available parameters.
174+
175+
## listChangeStreamsAsJSON.help()
176+
Provides help on how to use the function.
177+
178+
## listChangeStreamsAsCSV(extended?: boolean, delimiter: string, allUsers?: boolean, nsFilter?: Array<string>)
179+
Prints the currently open Change Streams as a CSV string with "||||" as the default delimeter. A string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour). The delimiter parameter allows overriding the default delimiter. See documentation for `listChangeStreams(extended?: boolean, allUsers?: boolean, nsFilter?: Array<string>)` for more details about the other available parameters.
180+
181+
## listChangeStreamsAsCSV.help()
182+
Provides help on how to use the function.
183+
160184
## prettyPrintChangeStreamPipeline(connectionId: any)
161185

162186
Pretty prints the Change Stream pipeline for a given Connection ID.

snippets/change-streams-monitor/changestreammonitor.js

Lines changed: 154 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,21 @@ const localRequire = require("module").createRequire(__filename);
22
const { Table } = localRequire("to-tabel");
33
const { templates } = localRequire("boks");
44

5-
function _listChangeStreams (extended = false, allUsers = true, nsFilter = []) {
5+
const OutputTypeEnum = {
6+
TABLE: 'TABLE',
7+
JSON: 'JSON',
8+
CSV : 'CSV'
9+
};
10+
11+
const PipelineFormatEnum = {
12+
EJSON : 'EJSON',
13+
NONE: 'NONE',
14+
JSON: 'JSON'
15+
};
16+
17+
const DEFAULT_DELIMITER="||||"
18+
19+
function _listChangeStreams (extended = false, allUsers = true, nsFilter = [], outputType = OutputTypeEnum.TABLE, pipelineFormat=PipelineFormatEnum.JSON, delimiter=DEFAULT_DELIMITER) {
620
tableData = [];
721
let changeStreamsDataRaw = getChangeStreams(allUsers, nsFilter);
822

@@ -16,11 +30,20 @@ function _listChangeStreams (extended = false, allUsers = true, nsFilter = []) {
1630
} catch (error) {}
1731

1832
//format the pipeline for better rendering
19-
let changeStreamPipeline = EJSON.stringify(
20-
changeStreamOpData.cursor.originatingCommand.pipeline,
21-
null,
22-
1
23-
);
33+
let changeStreamPipeline = ""
34+
switch (pipelineFormat){
35+
case PipelineFormatEnum.EJSON:
36+
changeStreamPipeline = EJSON.stringify(changeStreamOpData.cursor.originatingCommand.pipeline, null,1)
37+
break;
38+
case PipelineFormatEnum.JSON:
39+
changeStreamPipeline = JSON.stringify(changeStreamOpData.cursor.originatingCommand.pipeline)
40+
break;
41+
case PipelineFormatEnum.NONE:
42+
changeStreamPipeline = changeStreamOpData.cursor.originatingCommand.pipeline
43+
break;
44+
default:
45+
throw new Error("Internal Error: Unexepected PipelineFormatEnum value " + pipelineFormat)
46+
}
2447

2548
let usersStr = "";
2649
if (changeStreamOpData.effectiveUsers){
@@ -66,17 +89,30 @@ function _listChangeStreams (extended = false, allUsers = true, nsFilter = []) {
6689

6790
})
6891

69-
customConsoleTable(tableData, extended);
70-
print("Found " + changeStreamsDataRaw.length + " change streams");
92+
switch (outputType){
93+
case OutputTypeEnum.TABLE:
94+
generateTableOutput(tableData, extended);
95+
print("Found " + changeStreamsDataRaw.length + " change streams");
96+
break;
97+
case OutputTypeEnum.JSON:
98+
generateJsonOutput(tableData, extended);
99+
break;
100+
case OutputTypeEnum.CSV:
101+
generateCsvOutput(tableData, extended, delimiter);
102+
break;
103+
default:
104+
throw new Error("Internal Error: Unexepected OutputTypeEnum value " + outputType)
105+
}
71106
};
72107

73108
function _listChangeStreamsHelp(){
74109
print("listChangeStreams(extended?: boolean, allUsers?: boolean, nsFilter?: any): void")
110+
print("listChangeStreamsAsTable(extended?: boolean, allUsers?: boolean, nsFilter?: any): void")
75111
print("Prints a table with the currently open Change Streams. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour).")
76112
print("\t See prettyPrintChangeStreamPipeline.help() to pretty print a change stream pipeline. ")
77113
print("\t See ChangeStreamsData.help() and ExtendedChangeStreamsData.help() for data outputted in extended and non-extended mode.")
78114
print("\t @param extended — Controls whether a simple or extended output is presented. Refer to ExtendedChangeStreamsData. Defaults to false.")
79-
print("\t @param allUsers — Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e. If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command. If set to true, $currentOp reports operations belonging to all users. Defailts to true.")
115+
print("\t @param allUsers — Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e. If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command. If set to true, $currentOp reports operations belonging to all users. Defaults to true.")
80116
print("\t @param nsFilter — An optional array of namespace filter. Defaults to [] i.e. to filter.")
81117
}
82118

@@ -88,12 +124,70 @@ function _listChangeStreamsHelp(){
88124
* @param {boolean} allUsers Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e.
89125
* If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command.
90126
* If set to true, $currentOp reports operations belonging to all users.
91-
* Defailts to true.
127+
* Defaults to true.
92128
* @param {Array.<string>} nsFilter An optional array of namespace filter. Defaults to [] i.e. to filter.
93129
*/
94-
globalThis.listChangeStreams = function (extended = false, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter);}
130+
globalThis.listChangeStreams = function (extended = false, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.TABLE, PipelineFormatEnum.EJSON);}
95131
globalThis.listChangeStreams.help = function () {_listChangeStreamsHelp();}
96132

133+
/**
134+
* Alias for {@link listChangeStreams}
135+
*/
136+
globalThis.listChangeStreamsAsTable = globalThis.listChangeStreams
137+
globalThis.listChangeStreamsAsTable.help = function () {_listChangeStreamsHelp();}
138+
139+
140+
function _listChangeStreamsAsJSONHelp(){
141+
print("listChangeStreamsAsJSON(extended?: boolean, allUsers?: boolean, nsFilter?: any): void")
142+
print("Prints the currently open Change Streams as a JSON string. A JSON string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour).")
143+
print("\t See prettyPrintChangeStreamPipeline() to pretty print a change stream pipeline. ")
144+
print("\t See ChangeStreamsData.help() and ExtendedChangeStreamsData.help() for data outputted in extended and non-extended mode.")
145+
print("\t @param extended — Controls whether a simple or extended output is presented. Refer to ExtendedChangeStreamsData. Defaults to false.")
146+
print("\t @param allUsers — Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e. If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command. If set to true, $currentOp reports operations belonging to all users. Defaults to true.")
147+
print("\t @param nsFilter — An optional array of namespace filter. Defaults to [] i.e. to filter.")
148+
}
149+
150+
/**
151+
* Prints the currently open Change Streams as a JSON string. A JSON string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour).
152+
* See prettyPrintChangeStreamPipeline() to pretty print a change stream pipeline.
153+
* See ChangeStreamsData and ExtendedChangeStreamsData for data outputted in extended and non-extended mode.
154+
* @param {boolean} extended Controls whether a simple or extended output is presented. Refer to ExtendedChangeStreamsData. Defaults to false.
155+
* @param {boolean} allUsers Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e.
156+
* If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command.
157+
* If set to true, $currentOp reports operations belonging to all users.
158+
* Defaults to true.
159+
* @param {Array.<string>} nsFilter An optional array of namespace filter. Defaults to [] i.e. to filter.
160+
*/
161+
globalThis.listChangeStreamsAsJSON = function (extended = false, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.JSON, PipelineFormatEnum.NONE);}
162+
globalThis.listChangeStreamsAsJSON.help = function () {_listChangeStreamsAsJSONHelp();}
163+
164+
165+
function _listChangeStreamsAsCSVHelp(){
166+
print("listChangeStreamsAsJSON(extended?: boolean, delimiter?: string, allUsers?: boolean, nsFilter?: any): void")
167+
print("Prints the currently open Change Streams as a CSV string with \"" + DEFAULT_DELIMITER + "\" as the default delimeter. A string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour). ")
168+
print("\t See prettyPrintChangeStreamPipeline() to pretty print a change stream pipeline. ")
169+
print("\t See ChangeStreamsData.help() and ExtendedChangeStreamsData.help() for data outputted in extended and non-extended mode.")
170+
print("\t @param extended — Controls whether a simple or extended output is presented. Refer to ExtendedChangeStreamsData. Defaults to false.")
171+
print("\t @param delimiter — Provide a custom delimeter for the CSV string. Defaults to \"" + DEFAULT_DELIMITER + "\"")
172+
print("\t @param allUsers — Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e. If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command. If set to true, $currentOp reports operations belonging to all users. Defaults to true.")
173+
print("\t @param nsFilter — An optional array of namespace filter. Defaults to [] i.e. to filter.")
174+
}
175+
/**
176+
* Prints the currently open Change Streams as a CSV string with "||||" as the default delimeter. A string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour).
177+
* See prettyPrintChangeStreamPipeline() to pretty print a change stream pipeline.
178+
* See ChangeStreamsData and ExtendedChangeStreamsData for data outputted in extended and non-extended mode.
179+
* @param {boolean} extended Controls whether a simple or extended output is presented. Refer to ExtendedChangeStreamsData. Defaults to false.
180+
* @param {string} delimiter Provide a custom delimeter for the CSV string
181+
* @param {boolean} allUsers Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e.
182+
* If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command.
183+
* If set to true, $currentOp reports operations belonging to all users.
184+
* Defaults to true.
185+
* @param {Array.<string>} nsFilter An optional array of namespace filter. Defaults to [] i.e. to filter.
186+
*/
187+
globalThis.listChangeStreamsAsCSV = function (extended = false, delimiter=DEFAULT_DELIMITER, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.CSV, PipelineFormatEnum.JSON, delimiter);}
188+
globalThis.listChangeStreamsAsCSV.help = function () {_listChangeStreamsAsCSVHelp();}
189+
190+
97191
/**
98192
* @class Contains the data that will be presented in tabular format. This is the basic data set - @see {ExtendedChangeStreamsData} for the extended version.
99193
* @param {*} connId An identifier for the connection where the specific operation originated.
@@ -161,6 +255,13 @@ class ChangeStreamsData {
161255
let newTbl = new Table(ChangeStreamsData.headers(), options);
162256
newTbl.print();
163257
}
258+
259+
toCsvString(delimiter){
260+
return this.constructor.headers().reduce(
261+
(accumulator, currentValue) => accumulator === "" ? this[currentValue.name] : accumulator + delimiter + this[currentValue.name],
262+
""
263+
)
264+
}
164265
};
165266

166267
globalThis.ChangeStreamsData = ChangeStreamsData;
@@ -303,7 +404,7 @@ globalThis.getChangeStreams = function (allUsers, nsFilter) {
303404
* @param {*} data The data to be displayed in a table
304405
* @param {boolean} extended Whether the extended output format is being used. This is used to generate the output table headers.
305406
*/
306-
globalThis.customConsoleTable = function (data, extended) {
407+
globalThis.generateTableOutput = function (data, extended) {
307408
if (data && data.length > 0) {
308409
const options = {
309410
maxSize: process.stdout.columns - 10,
@@ -323,6 +424,47 @@ globalThis.customConsoleTable = function (data, extended) {
323424
}
324425
};
325426

427+
/**
428+
* Generates JSON output for the extracted changestream data
429+
* @param {*} data The data to be displayed in a table
430+
* @param {boolean} extended Whether the extended output format is being used. This is used to generate the output table headers.
431+
*/
432+
globalThis.generateJsonOutput = function (data, extended) {
433+
if (data && data.length > 0) {
434+
data.forEach(changeStreamOpData => {
435+
print(JSON.stringify(changeStreamOpData))
436+
})
437+
438+
} else {
439+
print("No Change Streams found!");
440+
}
441+
};
442+
443+
/**
444+
* Generates CSV output for the extracted changestream data
445+
* @param {*} data The data to be displayed in a table
446+
* @param {boolean} extended Whether the extended output format is being used. This is used to generate the output table headers.
447+
*/
448+
globalThis.generateCsvOutput = function (data, extended, delimiter) {
449+
if (data && data.length > 0) {
450+
let headersSource = extended ? ExtendedChangeStreamsData.headers() : ChangeStreamsData.headers()
451+
let headers = headersSource.map(h => h.name)
452+
let headersStr = headers.reduce(
453+
(accumulator, currentValue) => accumulator === "" ? currentValue : accumulator + delimiter + currentValue,
454+
""
455+
)
456+
print(headersStr)
457+
458+
data.forEach(changeStreamOpData => {
459+
print(changeStreamOpData.toCsvString(delimiter))
460+
})
461+
462+
} else {
463+
print("No Change Streams found!");
464+
}
465+
};
466+
467+
326468

327469
function _prettyPrintChangeStreamPipeline(connectionId) {
328470
let pipeline = [

snippets/change-streams-monitor/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@mongosh/snippet-change-stream-monitor",
33
"snippetName": "change-stream-monitor",
4-
"version": "0.1.0",
4+
"version": "0.2.0",
55
"description": "Mongosh snippet that allows users to monitor Change Streams on the current server.",
66
"main": "index.js",
77
"license": "MIT",

0 commit comments

Comments
 (0)