4
4
"database/sql"
5
5
"encoding/json"
6
6
"fmt"
7
+ "bytes"
7
8
"quackpipe/model"
8
9
"regexp"
9
10
"strings"
@@ -30,6 +31,12 @@ func ConversationOfRows(rows *sql.Rows, default_format string, duration time.Dur
30
31
return "" , err
31
32
}
32
33
return result , nil
34
+ case "JSONEachRow" , "NDJSON" :
35
+ result , err := rowsToNDJSON (rows )
36
+ if err != nil {
37
+ return "" , err
38
+ }
39
+ return result , nil
33
40
case "CSVWithNames" :
34
41
result , err := rowsToCSV (rows , true )
35
42
if err != nil {
@@ -122,6 +129,55 @@ func rowsToJSON(rows *sql.Rows, elapsedTime time.Duration) (string, error) {
122
129
return string (jsonData ), nil
123
130
}
124
131
132
+ // rowsToNDJSON converts the rows to NDJSON strings
133
+ func rowsToNDJSON (rows * sql.Rows ) (string , error ) {
134
+ defer rows .Close ()
135
+
136
+ columns , err := rows .Columns ()
137
+ if err != nil {
138
+ return "" , err
139
+ }
140
+
141
+ var buffer bytes.Buffer
142
+ values := make ([]interface {}, len (columns ))
143
+ scanArgs := make ([]interface {}, len (columns ))
144
+ for i := range values {
145
+ scanArgs [i ] = & values [i ]
146
+ }
147
+
148
+ for rows .Next () {
149
+ err := rows .Scan (scanArgs ... )
150
+ if err != nil {
151
+ return "" , err
152
+ }
153
+
154
+ rowMap := make (map [string ]interface {})
155
+ for i , col := range columns {
156
+ val := values [i ]
157
+ b , ok := val .([]byte )
158
+ if ok {
159
+ rowMap [col ] = string (b )
160
+ } else {
161
+ rowMap [col ] = val
162
+ }
163
+ }
164
+
165
+ jsonData , err := json .Marshal (rowMap )
166
+ if err != nil {
167
+ return "" , err
168
+ }
169
+
170
+ buffer .Write (jsonData )
171
+ buffer .WriteByte ('\n' )
172
+ }
173
+
174
+ if err = rows .Err (); err != nil {
175
+ return "" , err
176
+ }
177
+
178
+ return buffer .String (), nil
179
+ }
180
+
125
181
// rowsToTSV converts the rows to TSV string
126
182
func rowsToTSV (rows * sql.Rows , cols bool ) (string , error ) {
127
183
var result []string
0 commit comments