17
17
18
18
//! Metadata table api.
19
19
20
+ use std:: collections:: HashSet ;
20
21
use std:: sync:: Arc ;
21
22
22
- use arrow_array:: builder:: { MapBuilder , PrimitiveBuilder , StringBuilder } ;
23
+ use arrow_array:: builder:: { BooleanBuilder , MapBuilder , PrimitiveBuilder , StringBuilder } ;
23
24
use arrow_array:: types:: { Int64Type , TimestampMillisecondType } ;
24
25
use arrow_array:: RecordBatch ;
25
26
use arrow_schema:: { DataType , Field , Schema , TimeUnit } ;
26
27
27
- use crate :: spec:: TableMetadataRef ;
28
+ use crate :: spec:: { SnapshotRef , TableMetadataRef } ;
28
29
use crate :: table:: Table ;
29
30
use crate :: Result ;
30
31
@@ -50,6 +51,11 @@ impl MetadataScan {
50
51
pub fn snapshots ( & self ) -> Result < RecordBatch > {
51
52
SnapshotsTable :: scan ( self )
52
53
}
54
+
55
+ /// Return the history of the table.
56
+ pub fn history ( & self ) -> Result < RecordBatch > {
57
+ HistoryTable :: scan ( self )
58
+ }
53
59
}
54
60
55
61
/// Table metadata scan.
@@ -137,6 +143,93 @@ impl MetadataTable for SnapshotsTable {
137
143
}
138
144
}
139
145
146
+ /// History table.
147
+ ///
148
+ /// Shows how the table's current snapshot has changed over time and when each
149
+ /// snapshot became the current snapshot.
150
+ ///
151
+ /// Unlike the [Snapshots][SnapshotsTable], this metadata table has less detail
152
+ /// per snapshot but includes ancestry information of the current snapshot.
153
+ ///
154
+ /// `is_current_ancestor` indicates whether the snapshot is an ancestor of the
155
+ /// current snapshot. If `false`, then the snapshot was rolled back.
156
+ pub struct HistoryTable ;
157
+
158
+ impl MetadataTable for HistoryTable {
159
+ fn schema ( ) -> Schema {
160
+ Schema :: new ( vec ! [
161
+ Field :: new(
162
+ "made_current_at" ,
163
+ DataType :: Timestamp ( TimeUnit :: Millisecond , Some ( "+00:00" . into( ) ) ) ,
164
+ false ,
165
+ ) ,
166
+ Field :: new( "snapshot_id" , DataType :: Int64 , false ) ,
167
+ Field :: new( "parent_id" , DataType :: Int64 , true ) ,
168
+ Field :: new( "is_current_ancestor" , DataType :: Boolean , false ) ,
169
+ ] )
170
+ }
171
+
172
+ fn scan ( scan : & MetadataScan ) -> Result < RecordBatch > {
173
+ let mut made_current_at =
174
+ PrimitiveBuilder :: < TimestampMillisecondType > :: new ( ) . with_timezone ( "+00:00" ) ;
175
+ let mut snapshot_id = PrimitiveBuilder :: < Int64Type > :: new ( ) ;
176
+ let mut parent_id = PrimitiveBuilder :: < Int64Type > :: new ( ) ;
177
+ let mut is_current_ancestor = BooleanBuilder :: new ( ) ;
178
+
179
+ let ancestors: HashSet < i64 > =
180
+ Ancestors :: new ( scan. metadata_ref . current_snapshot ( ) , & scan. metadata_ref )
181
+ . map ( |snapshot| snapshot. snapshot_id ( ) )
182
+ . collect ( ) ;
183
+
184
+ for snapshot in scan. metadata_ref . snapshots ( ) {
185
+ made_current_at. append_value ( snapshot. timestamp_ms ( ) ) ;
186
+ snapshot_id. append_value ( snapshot. snapshot_id ( ) ) ;
187
+ parent_id. append_option ( snapshot. parent_snapshot_id ( ) ) ;
188
+ is_current_ancestor. append_value ( ancestors. contains ( & snapshot. snapshot_id ( ) ) ) ;
189
+ }
190
+
191
+ Ok ( RecordBatch :: try_new ( Arc :: new ( Self :: schema ( ) ) , vec ! [
192
+ Arc :: new( made_current_at. finish( ) ) ,
193
+ Arc :: new( snapshot_id. finish( ) ) ,
194
+ Arc :: new( parent_id. finish( ) ) ,
195
+ Arc :: new( is_current_ancestor. finish( ) ) ,
196
+ ] ) ?)
197
+ }
198
+ }
199
+
200
+ struct Ancestors < ' a > {
201
+ current_snapshot : Option < & ' a SnapshotRef > ,
202
+ table_metadata : & ' a TableMetadataRef ,
203
+ }
204
+
205
+ impl < ' a > Ancestors < ' a > {
206
+ fn new (
207
+ current_snapshot : Option < & ' a SnapshotRef > ,
208
+ table_metadata : & ' a TableMetadataRef ,
209
+ ) -> Self {
210
+ Ancestors {
211
+ current_snapshot,
212
+ table_metadata,
213
+ }
214
+ }
215
+ }
216
+
217
+ impl < ' a > Iterator for Ancestors < ' a > {
218
+ type Item = & ' a SnapshotRef ;
219
+
220
+ fn next ( & mut self ) -> Option < Self :: Item > {
221
+ if let Some ( snapshot) = self . current_snapshot {
222
+ self . current_snapshot = match snapshot. parent_snapshot_id ( ) {
223
+ Some ( parent_snapshot_id) => self . table_metadata . snapshot_by_id ( parent_snapshot_id) ,
224
+ None => None ,
225
+ } ;
226
+ Some ( snapshot)
227
+ } else {
228
+ None
229
+ }
230
+ }
231
+ }
232
+
140
233
#[ cfg( test) ]
141
234
mod tests {
142
235
use expect_test:: { expect, Expect } ;
@@ -262,4 +355,41 @@ mod tests {
262
355
Some ( "committed_at" ) ,
263
356
) ;
264
357
}
358
+
359
+ #[ test]
360
+ fn test_history_table ( ) {
361
+ let table = TableTestFixture :: new ( ) . table ;
362
+ let record_batch = table. metadata_scan ( ) . history ( ) . unwrap ( ) ;
363
+ check_record_batch (
364
+ record_batch,
365
+ expect ! [ [ r#"
366
+ Field { name: "made_current_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
367
+ Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
368
+ Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
369
+ Field { name: "is_current_ancestor", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"# ] ] ,
370
+ expect ! [ [ r#"
371
+ made_current_at: PrimitiveArray<Timestamp(Millisecond, Some("+00:00"))>
372
+ [
373
+ 2018-01-04T21:22:35.770+00:00,
374
+ 2019-04-12T20:29:15.770+00:00,
375
+ ],
376
+ snapshot_id: PrimitiveArray<Int64>
377
+ [
378
+ 3051729675574597004,
379
+ 3055729675574597004,
380
+ ],
381
+ parent_id: PrimitiveArray<Int64>
382
+ [
383
+ null,
384
+ 3051729675574597004,
385
+ ],
386
+ is_current_ancestor: BooleanArray
387
+ [
388
+ true,
389
+ true,
390
+ ]"# ] ] ,
391
+ & [ ] ,
392
+ Some ( "made_current_at" ) ,
393
+ ) ;
394
+ }
265
395
}
0 commit comments