@@ -83,3 +83,143 @@ pub fn flatten_kinesis_logs(message: Message) -> Vec<Value> {
83
83
84
84
vec_kinesis_json
85
85
}
86
+
87
+ #[ cfg( test) ]
88
+ mod tests {
89
+ use serde_json:: { json, Value } ;
90
+
91
+ use super :: { flatten_kinesis_logs, Message } ;
92
+
93
+ #[ test]
94
+ fn flatten_kinesis_logs_decodes_base64_data ( ) {
95
+ let message: Message = serde_json:: from_value ( json ! ( {
96
+ "requestId" : "9b848d8a-2d89-474b-b073-04b8e5232210" . to_string( ) ,
97
+ "timestamp" : 1705026780451_i64 ,
98
+ "records" : [
99
+ {
100
+ "data" : "eyJDSEFOR0UiOi0wLjQ1LCJQUklDRSI6NS4zNiwiVElDS0VSX1NZTUJPTCI6IkRFRyIsIlNFQ1RPUiI6IkVORVJHWSJ9" . to_string( ) ,
101
+ } ,
102
+ {
103
+ "data" : "eyJDSEFOR0UiOjMuMTYsIlBSSUNFIjo3My43NiwiVElDS0VSX1NZTUJPTCI6IldNVCIsIlNFQ1RPUiI6IlJFVEFJTCJ9" . to_string( ) ,
104
+ } ,
105
+ ] ,
106
+ } ) ) . unwrap ( ) ;
107
+
108
+ let result = flatten_kinesis_logs ( message) ;
109
+ assert_eq ! ( result. len( ) , 2 ) ;
110
+
111
+ let Value :: Object ( map) = & result[ 0 ] else {
112
+ panic ! ( "Expected first result to be a JSON object" ) ;
113
+ } ;
114
+ assert_eq ! (
115
+ map. get( "CHANGE" ) . unwrap( ) ,
116
+ & Value :: Number ( serde_json:: Number :: from_f64( -0.45 ) . unwrap( ) )
117
+ ) ;
118
+ assert_eq ! (
119
+ map. get( "PRICE" ) . unwrap( ) ,
120
+ & Value :: Number ( serde_json:: Number :: from_f64( 5.36 ) . unwrap( ) )
121
+ ) ;
122
+ assert_eq ! (
123
+ map. get( "TICKER_SYMBOL" ) . unwrap( ) ,
124
+ & Value :: String ( "DEG" . to_string( ) )
125
+ ) ;
126
+ assert_eq ! (
127
+ map. get( "SECTOR" ) . unwrap( ) ,
128
+ & Value :: String ( "ENERGY" . to_string( ) )
129
+ ) ;
130
+ assert_eq ! (
131
+ map. get( "requestId" ) . unwrap( ) ,
132
+ & Value :: String ( "9b848d8a-2d89-474b-b073-04b8e5232210" . to_string( ) )
133
+ ) ;
134
+ assert_eq ! (
135
+ map. get( "timestamp" ) . unwrap( ) ,
136
+ & Value :: String ( "1705026780451" . to_string( ) )
137
+ ) ;
138
+
139
+ let Value :: Object ( map) = & result[ 1 ] else {
140
+ panic ! ( "Expected second result to be a JSON object" ) ;
141
+ } ;
142
+ assert_eq ! (
143
+ map. get( "CHANGE" ) . unwrap( ) ,
144
+ & Value :: Number ( serde_json:: Number :: from_f64( 3.16 ) . unwrap( ) )
145
+ ) ;
146
+ assert_eq ! (
147
+ map. get( "PRICE" ) . unwrap( ) ,
148
+ & Value :: Number ( serde_json:: Number :: from_f64( 73.76 ) . unwrap( ) )
149
+ ) ;
150
+ assert_eq ! (
151
+ map. get( "TICKER_SYMBOL" ) . unwrap( ) ,
152
+ & Value :: String ( "WMT" . to_string( ) )
153
+ ) ;
154
+ assert_eq ! (
155
+ map. get( "SECTOR" ) . unwrap( ) ,
156
+ & Value :: String ( "RETAIL" . to_string( ) )
157
+ ) ;
158
+ assert_eq ! (
159
+ map. get( "requestId" ) . unwrap( ) ,
160
+ & Value :: String ( "9b848d8a-2d89-474b-b073-04b8e5232210" . to_string( ) )
161
+ ) ;
162
+ assert_eq ! (
163
+ map. get( "timestamp" ) . unwrap( ) ,
164
+ & Value :: String ( "1705026780451" . to_string( ) )
165
+ ) ;
166
+ }
167
+
168
+ #[ test]
169
+ fn flatten_kinesis_logs_adds_request_id_and_timestamp ( ) {
170
+ let message: Message = serde_json:: from_value ( json ! ( {
171
+ "requestId" : "9b848d8a-2d89-474b-b073-04b8e5232210" . to_string( ) ,
172
+ "timestamp" : 1705026780451_i64 ,
173
+ "records" : [
174
+ {
175
+ "data" : "eyJDSEFOR0UiOi0wLjQ1LCJQUklDRSI6NS4zNiwiVElDS0VSX1NZTUJPTCI6IkRFRyIsIlNFQ1RPUiI6IkVORVJHWSJ9" . to_string( ) ,
176
+ } ,
177
+ {
178
+ "data" : "eyJDSEFOR0UiOjMuMTYsIlBSSUNFIjo3My43NiwiVElDS0VSX1NZTUJPTCI6IldNVCIsIlNFQ1RPUiI6IlJFVEFJTCJ9" . to_string( ) ,
179
+ } ,
180
+ ] ,
181
+ } ) ) . unwrap ( ) ;
182
+
183
+ let result = flatten_kinesis_logs ( message) ;
184
+ assert_eq ! ( result. len( ) , 2 ) ;
185
+
186
+ let Value :: Object ( map) = & result[ 0 ] else {
187
+ panic ! ( "Expected first result to be a JSON object" ) ;
188
+ } ;
189
+ assert_eq ! (
190
+ map. get( "requestId" ) . unwrap( ) ,
191
+ & Value :: String ( "9b848d8a-2d89-474b-b073-04b8e5232210" . to_string( ) )
192
+ ) ;
193
+ assert_eq ! (
194
+ map. get( "timestamp" ) . unwrap( ) ,
195
+ & Value :: String ( "1705026780451" . to_string( ) )
196
+ ) ;
197
+
198
+ let Value :: Object ( map) = & result[ 1 ] else {
199
+ panic ! ( "Expected second result to be a JSON object" ) ;
200
+ } ;
201
+ assert_eq ! (
202
+ map. get( "requestId" ) . unwrap( ) ,
203
+ & Value :: String ( "9b848d8a-2d89-474b-b073-04b8e5232210" . to_string( ) )
204
+ ) ;
205
+ assert_eq ! (
206
+ map. get( "timestamp" ) . unwrap( ) ,
207
+ & Value :: String ( "1705026780451" . to_string( ) )
208
+ ) ;
209
+ }
210
+
211
+ #[ test]
212
+ #[ should_panic( expected = "InvalidByte(7, 95)" ) ]
213
+ fn malformed_json_after_base64_decoding ( ) {
214
+ let message: Message = serde_json:: from_value ( json ! ( {
215
+ "requestId" : "9b848d8a-2d89-474b-b073-04b8e5232210" . to_string( ) ,
216
+ "timestamp" : 1705026780451_i64 ,
217
+ "records" : [ {
218
+ "data" : "invalid_base64_data" . to_string( ) ,
219
+ } ] ,
220
+ } ) )
221
+ . unwrap ( ) ;
222
+
223
+ flatten_kinesis_logs ( message) ;
224
+ }
225
+ }
0 commit comments