@@ -83,3 +83,115 @@ pub fn flatten_kinesis_logs(message: Message) -> Vec<Value> {
83
83
84
84
vec_kinesis_json
85
85
}
86
+
87
+ #[ cfg( test) ]
88
+ mod tests {
89
+ use serde_json:: { json, Value } ;
90
+
91
+ use super :: { flatten_kinesis_logs, Message } ;
92
+
93
+ #[ test]
94
+ fn flatten_kinesis_logs_decodes_base64_data ( ) {
95
+ let message: Message = serde_json:: from_value ( json ! ( {
96
+ "requestId" : "9b848d8a-2d89-474b-b073-04b8e5232210" . to_string( ) ,
97
+ "timestamp" : 1705026780451_i64 ,
98
+ "records" : [
99
+ {
100
+ "data" : "eyJDSEFOR0UiOi0wLjQ1LCJQUklDRSI6NS4zNiwiVElDS0VSX1NZTUJPTCI6IkRFRyIsIlNFQ1RPUiI6IkVORVJHWSJ9" . to_string( ) ,
101
+ } ,
102
+ {
103
+ "data" : "eyJDSEFOR0UiOjMuMTYsIlBSSUNFIjo3My43NiwiVElDS0VSX1NZTUJPTCI6IldNVCIsIlNFQ1RPUiI6IlJFVEFJTCJ9" . to_string( ) ,
104
+ } ,
105
+ ] ,
106
+ } ) ) . unwrap ( ) ;
107
+
108
+ let result = flatten_kinesis_logs ( message) ;
109
+ assert_eq ! ( result. len( ) , 2 ) ;
110
+
111
+ let Value :: Object ( map) = & result[ 0 ] else {
112
+ panic ! ( "Expected first result to be a JSON object" ) ;
113
+ } ;
114
+ assert_eq ! ( map. get( "CHANGE" ) . unwrap( ) . as_f64( ) . unwrap( ) , -0.45 ) ;
115
+ assert_eq ! ( map. get( "PRICE" ) . unwrap( ) . as_f64( ) . unwrap( ) , 5.36 ) ;
116
+ assert_eq ! ( map. get( "TICKER_SYMBOL" ) . unwrap( ) . as_str( ) . unwrap( ) , "DEG" ) ;
117
+ assert_eq ! ( map. get( "SECTOR" ) . unwrap( ) . as_str( ) . unwrap( ) , "ENERGY" ) ;
118
+ assert_eq ! (
119
+ map. get( "requestId" ) . unwrap( ) . as_str( ) . unwrap( ) ,
120
+ "9b848d8a-2d89-474b-b073-04b8e5232210"
121
+ ) ;
122
+ assert_eq ! (
123
+ map. get( "timestamp" ) . unwrap( ) . as_str( ) . unwrap( ) ,
124
+ "1705026780451"
125
+ ) ;
126
+
127
+ let Value :: Object ( map) = & result[ 1 ] else {
128
+ panic ! ( "Expected second result to be a JSON object" ) ;
129
+ } ;
130
+ assert_eq ! ( map. get( "CHANGE" ) . unwrap( ) . as_f64( ) . unwrap( ) , 3.16 ) ;
131
+ assert_eq ! ( map. get( "PRICE" ) . unwrap( ) . as_f64( ) . unwrap( ) , 73.76 ) ;
132
+ assert_eq ! ( map. get( "TICKER_SYMBOL" ) . unwrap( ) , "WMT" ) ;
133
+ assert_eq ! ( map. get( "SECTOR" ) . unwrap( ) , "RETAIL" ) ;
134
+ assert_eq ! (
135
+ map. get( "requestId" ) . unwrap( ) . as_str( ) . unwrap( ) ,
136
+ "9b848d8a-2d89-474b-b073-04b8e5232210"
137
+ ) ;
138
+ assert_eq ! (
139
+ map. get( "timestamp" ) . unwrap( ) . as_str( ) . unwrap( ) ,
140
+ "1705026780451"
141
+ ) ;
142
+ }
143
+
144
+ #[ test]
145
+ fn flatten_kinesis_logs_adds_request_id_and_timestamp ( ) {
146
+ let message: Message = serde_json:: from_value ( json ! ( {
147
+ "requestId" : "9b848d8a-2d89-474b-b073-04b8e5232210" . to_string( ) ,
148
+ "timestamp" : 1705026780451_i64 ,
149
+ "records" : [
150
+ {
151
+ "data" : "eyJDSEFOR0UiOi0wLjQ1LCJQUklDRSI6NS4zNiwiVElDS0VSX1NZTUJPTCI6IkRFRyIsIlNFQ1RPUiI6IkVORVJHWSJ9" . to_string( ) ,
152
+ } ,
153
+ {
154
+ "data" : "eyJDSEFOR0UiOjMuMTYsIlBSSUNFIjo3My43NiwiVElDS0VSX1NZTUJPTCI6IldNVCIsIlNFQ1RPUiI6IlJFVEFJTCJ9" . to_string( ) ,
155
+ } ,
156
+ ] ,
157
+ } ) ) . unwrap ( ) ;
158
+
159
+ let result = flatten_kinesis_logs ( message) ;
160
+ assert_eq ! ( result. len( ) , 2 ) ;
161
+
162
+ let event = result[ 0 ] . as_object ( ) . unwrap ( ) ;
163
+ assert_eq ! (
164
+ event. get( "requestId" ) . unwrap( ) . as_str( ) . unwrap( ) ,
165
+ "9b848d8a-2d89-474b-b073-04b8e5232210"
166
+ ) ;
167
+ assert_eq ! (
168
+ event. get( "timestamp" ) . unwrap( ) . as_str( ) . unwrap( ) ,
169
+ "1705026780451"
170
+ ) ;
171
+
172
+ let event = result[ 1 ] . as_object ( ) . unwrap ( ) ;
173
+ assert_eq ! (
174
+ event. get( "requestId" ) . unwrap( ) . as_str( ) . unwrap( ) ,
175
+ "9b848d8a-2d89-474b-b073-04b8e5232210"
176
+ ) ;
177
+ assert_eq ! (
178
+ event. get( "timestamp" ) . unwrap( ) . as_str( ) . unwrap( ) ,
179
+ "1705026780451"
180
+ ) ;
181
+ }
182
+
183
+ #[ test]
184
+ #[ should_panic( expected = "InvalidByte(7, 95)" ) ]
185
+ fn malformed_json_after_base64_decoding ( ) {
186
+ let message: Message = serde_json:: from_value ( json ! ( {
187
+ "requestId" : "9b848d8a-2d89-474b-b073-04b8e5232210" . to_string( ) ,
188
+ "timestamp" : 1705026780451_i64 ,
189
+ "records" : [ {
190
+ "data" : "invalid_base64_data" . to_string( ) ,
191
+ } ] ,
192
+ } ) )
193
+ . unwrap ( ) ;
194
+
195
+ flatten_kinesis_logs ( message) ;
196
+ }
197
+ }
0 commit comments