@@ -973,13 +973,13 @@ impl Client {
973973 for field_schema in schema. field_schema . iter ( ) {
974974 select_map
975975 . entry ( field_schema. field_type )
976- . or_insert ( vec ! [ String :: from( "timeseries_key" ) ] )
976+ . or_insert_with ( || vec ! [ String :: from( "timeseries_key" ) ] )
977977 . push ( format ! (
978- "anyIf(field_value, field_name = '{}') AS `{}` " ,
978+ "anyIf(field_value, field_name = '{}') AS {} " ,
979979 field_schema. name, field_schema. name
980980 ) ) ;
981981 top_selects. push ( format ! (
982- "{}_pivot.`{}` AS `{}` " ,
982+ "{}_pivot.{} AS {} " ,
983983 field_table_name( field_schema. field_type) ,
984984 field_schema. name,
985985 field_schema. name
@@ -992,10 +992,19 @@ impl Client {
992992 let mut field_types: Vec < oximeter:: FieldType > =
993993 select_map. keys ( ) . cloned ( ) . collect ( ) ;
994994 field_types. sort_by ( |a, b| {
995- select_map[ b] . len ( ) . cmp ( & select_map[ a] . len ( ) )
995+ select_map[ b]
996+ . len ( )
997+ . cmp ( & select_map[ a] . len ( ) )
998+ . then ( field_table_name ( * a) . cmp ( & field_table_name ( * b) ) )
996999 } ) ;
9971000
998- // Build a map from field type to pivot subquery.
1001+ // Build a map from field type to pivot subquery. We filter by
1002+ // timeseries_name, group by timeseries_key, and use anyIf to
1003+ // pivot fields to a wide table. We can use anyIf to take the
1004+ // first matching value because a given timeseries key is
1005+ // always associated with the same set of fields, so all rows
1006+ // with a given (timeseries_key, field_name) will have the same
1007+ // field_value.
9991008 let mut query_map: HashMap < oximeter:: FieldType , String > =
10001009 HashMap :: new ( ) ;
10011010 for field_type in field_types. clone ( ) {
@@ -1190,7 +1199,7 @@ mod tests {
11901199 AuthzScope , DatumType , FieldSchema , FieldSource , FieldType , Sample ,
11911200 TimeseriesSchema , Units ,
11921201 } ;
1193- use oximeter:: { FieldValue , types:: Cumulative } ;
1202+ use oximeter:: { FieldValue , TimeseriesName , types:: Cumulative } ;
11941203 use oxql_types:: { Table , Timeseries , point:: Points } ;
11951204 use std:: collections:: { BTreeMap , BTreeSet } ;
11961205 use std:: time:: Duration ;
@@ -1331,6 +1340,126 @@ mod tests {
13311340 TestContext { logctx, clickhouse : db, client, test_data }
13321341 }
13331342
1343+ #[ tokio:: test]
1344+ async fn test_get_fields_query ( ) {
1345+ let ctx = setup_oxql_test ( "test_get_fields_query" ) . await ;
1346+
1347+ let schema = ctx
1348+ . client
1349+ . schema_for_timeseries (
1350+ & TimeseriesName :: try_from ( "some_target:some_metric" ) . unwrap ( ) ,
1351+ )
1352+ . await
1353+ . unwrap ( )
1354+ . unwrap ( ) ;
1355+ let query = ctx. client . all_fields_query ( & schema, None ) . unwrap ( ) ;
1356+ let want = "SELECT
1357+ fields_i32_pivot.foo AS foo,
1358+ fields_u32_pivot.index AS index,
1359+ fields_string_pivot.name AS name,
1360+ fields_i32_pivot.timeseries_key AS timeseries_key
1361+ FROM
1362+ (
1363+ SELECT
1364+ timeseries_key,
1365+ anyIf(field_value, field_name = 'foo') AS foo
1366+ FROM oximeter.fields_i32
1367+ WHERE timeseries_name = 'some_target:some_metric'
1368+ GROUP BY timeseries_key
1369+ ) AS fields_i32_pivot
1370+ JOIN
1371+ (
1372+ SELECT
1373+ timeseries_key,
1374+ anyIf(field_value, field_name = 'name') AS name
1375+ FROM oximeter.fields_string
1376+ WHERE timeseries_name = 'some_target:some_metric'
1377+ GROUP BY timeseries_key
1378+ ) AS fields_string_pivot ON fields_i32_pivot.timeseries_key = fields_string_pivot.timeseries_key
1379+ JOIN
1380+ (
1381+ SELECT
1382+ timeseries_key,
1383+ anyIf(field_value, field_name = 'index') AS index
1384+ FROM oximeter.fields_u32
1385+ WHERE timeseries_name = 'some_target:some_metric'
1386+ GROUP BY timeseries_key
1387+ ) AS fields_u32_pivot ON fields_i32_pivot.timeseries_key = fields_u32_pivot.timeseries_key" ;
1388+ assert_eq ! (
1389+ want. split_whitespace( ) . collect:: <Vec <& str >>( ) . join( " " ) ,
1390+ query. split_whitespace( ) . collect:: <Vec <& str >>( ) . join( " " )
1391+ ) ;
1392+
1393+ ctx. cleanup_successful ( ) . await ;
1394+ }
1395+
1396+ #[ tokio:: test]
1397+ async fn test_get_fields ( ) {
1398+ let ctx = setup_oxql_test ( "test_get_fields" ) . await ;
1399+
1400+ #[ derive( Clone , Debug , oximeter:: Metric ) ]
1401+ struct Metric1 {
1402+ foo : i32 ,
1403+ bar : i32 ,
1404+ datum : Cumulative < u64 > ,
1405+ }
1406+ #[ derive( Clone , Debug , oximeter:: Metric ) ]
1407+ struct Metric2 {
1408+ foo : i32 ,
1409+ baz : i32 ,
1410+ datum : Cumulative < u64 > ,
1411+ }
1412+
1413+ // Insert samples for multiple metrics with partially overlapping field
1414+ // names and types. Then we'll query for one of those metrics and
1415+ // assert that we only get the expected fields, and not fields of the
1416+ // same name and type from another metric.
1417+ let samples = [
1418+ Sample :: new (
1419+ & SomeTarget { name : String :: from ( "ts1" ) , index : 1 } ,
1420+ & Metric1 { foo : 1 , bar : 2 , datum : Cumulative :: new ( 5 ) } ,
1421+ )
1422+ . unwrap ( ) ,
1423+ Sample :: new (
1424+ & SomeTarget { name : String :: from ( "ts1" ) , index : 1 } ,
1425+ & Metric1 { foo : 1 , bar : 2 , datum : Cumulative :: new ( 6 ) } ,
1426+ )
1427+ . unwrap ( ) ,
1428+ Sample :: new (
1429+ & SomeTarget { name : String :: from ( "ts2" ) , index : 1 } ,
1430+ & Metric2 { foo : 3 , baz : 4 , datum : Cumulative :: new ( 5 ) } ,
1431+ )
1432+ . unwrap ( ) ,
1433+ Sample :: new (
1434+ & SomeTarget { name : String :: from ( "ts2" ) , index : 1 } ,
1435+ & Metric2 { foo : 3 , baz : 4 , datum : Cumulative :: new ( 6 ) } ,
1436+ )
1437+ . unwrap ( ) ,
1438+ ] ;
1439+ ctx. client
1440+ . insert_samples ( & samples[ ..] )
1441+ . await
1442+ . expect ( "failed to insert samples" ) ;
1443+
1444+ let query = "get some_target:metric2 | filter timestamp > @2020-01-01" ;
1445+ let result = ctx
1446+ . client
1447+ . oxql_query ( query, QueryAuthzScope :: Fleet )
1448+ . await
1449+ . expect ( "failed to run OxQL query" ) ;
1450+
1451+ assert_eq ! ( result. tables. len( ) , 1 , "should be exactly 1 table" ) ;
1452+ let table = result. tables . get ( 0 ) . unwrap ( ) ;
1453+
1454+ assert_eq ! ( table. n_timeseries( ) , 1 , "should be exactly 1 series" ) ;
1455+ let series: Vec < & Timeseries > = table. timeseries ( ) . collect ( ) ;
1456+
1457+ assert_eq ! ( series[ 0 ] . fields. get( "foo" ) . unwrap( ) , & FieldValue :: I32 ( 3 ) ) ;
1458+ assert_eq ! ( series[ 0 ] . fields. get( "baz" ) . unwrap( ) , & FieldValue :: I32 ( 4 ) ) ;
1459+
1460+ ctx. cleanup_successful ( ) . await ;
1461+ }
1462+
13341463 #[ tokio:: test]
13351464 async fn test_get_entire_table ( ) {
13361465 let ctx = setup_oxql_test ( "test_get_entire_table" ) . await ;
0 commit comments